hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1399950 [9/27] - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project: ./ hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/dev-support/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apach...
Date Fri, 19 Oct 2012 02:28:07 GMT
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Fri Oct 19 02:25:55 2012
@@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -58,11 +57,10 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSymlinkRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DistributedUpgradeProgressRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DistributedUpgradeProgressResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
@@ -89,6 +87,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewLeaseRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerRequestProto;
@@ -99,8 +99,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.io.EnumSetWritable;
@@ -126,11 +126,11 @@ public class ClientNamenodeProtocolTrans
     ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
   final private ClientNamenodeProtocolPB rpcProxy;
 
-  public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy)
-      throws IOException {
+  public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy) {
     rpcProxy = proxy;
   }
   
+  @Override
   public void close() {
     RPC.stopProxy(rpcProxy);
   }
@@ -527,6 +527,17 @@ public class ClientNamenodeProtocolTrans
       throw ProtobufHelper.getRemoteException(e);
     }
   }
+  
+  @Override
+  public long rollEdits() throws AccessControlException, IOException {
+    RollEditsRequestProto req = RollEditsRequestProto.getDefaultInstance();
+    try {
+      RollEditsResponseProto resp = rpcProxy.rollEdits(null, req);
+      return resp.getNewSegmentTxId();
+    } catch (ServiceException se) {
+      throw ProtobufHelper.getRemoteException(se);
+    }
+  }
 
   @Override
   public boolean restoreFailedStorage(String arg) 
@@ -562,21 +573,6 @@ public class ClientNamenodeProtocolTrans
   }
 
   @Override
-  public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
-      throws IOException {
-    DistributedUpgradeProgressRequestProto req = 
-        DistributedUpgradeProgressRequestProto.newBuilder().
-        setAction(PBHelper.convert(action)).build();
-    try {
-      DistributedUpgradeProgressResponseProto res = rpcProxy
-          .distributedUpgradeProgress(null, req);
-      return res.hasReport() ? PBHelper.convert(res.getReport()) : null;
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-  }
-
-  @Override
   public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
       throws IOException {
     ListCorruptFileBlocksRequestProto.Builder req = 
@@ -814,9 +810,22 @@ public class ClientNamenodeProtocolTrans
         ClientNamenodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
         RPC.getProtocolVersion(ClientNamenodeProtocolPB.class), methodName);
   }
+  
+  @Override
+  public DataEncryptionKey getDataEncryptionKey() throws IOException {
+    GetDataEncryptionKeyRequestProto req = GetDataEncryptionKeyRequestProto
+        .newBuilder().build();
+    try {
+      return PBHelper.convert(rpcProxy.getDataEncryptionKey(null, req)
+          .getDataEncryptionKey());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
 
   @Override
   public Object getUnderlyingProxyObject() {
     return rpcProxy;
   }
+  
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java Fri Oct 19 02:25:55 2012
@@ -41,8 +41,6 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
@@ -59,7 +57,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
-import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
@@ -253,20 +250,6 @@ public class DatanodeProtocolClientSideT
   }
 
   @Override
-  public UpgradeCommand processUpgradeCommand(UpgradeCommand comm)
-      throws IOException {
-    ProcessUpgradeRequestProto req = ProcessUpgradeRequestProto.newBuilder()
-        .setCmd(PBHelper.convert(comm)).build();
-    ProcessUpgradeResponseProto resp;
-    try {
-      resp = rpcProxy.processUpgrade(NULL_CONTROLLER, req);
-    } catch (ServiceException se) {
-      throw ProtobufHelper.getRemoteException(se);
-    }
-    return resp.hasCmd() ? PBHelper.convert(resp.getCmd()) : null;
-  }
-
-  @Override
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
     ReportBadBlocksRequestProto.Builder builder = ReportBadBlocksRequestProto
         .newBuilder();

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java Fri Oct 19 02:25:55 2012
@@ -33,8 +33,6 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto;
@@ -56,7 +54,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
-import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
@@ -212,25 +209,6 @@ public class DatanodeProtocolServerSideT
   }
 
   @Override
-  public ProcessUpgradeResponseProto processUpgrade(RpcController controller,
-      ProcessUpgradeRequestProto request) throws ServiceException {
-    UpgradeCommand ret;
-    try {
-      UpgradeCommand cmd = request.hasCmd() ? PBHelper
-          .convert(request.getCmd()) : null;
-      ret = impl.processUpgradeCommand(cmd);
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
-    ProcessUpgradeResponseProto.Builder builder = 
-        ProcessUpgradeResponseProto.newBuilder();
-    if (ret != null) {
-      builder.setCmd(PBHelper.convert(ret));
-    }
-    return builder.build();
-  }
-
-  @Override
   public ReportBadBlocksResponseProto reportBadBlocks(RpcController controller,
       ReportBadBlocksRequestProto request) throws ServiceException {
     List<LocatedBlockProto> lbps = request.getBlocksList();

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java Fri Oct 19 02:25:55 2012
@@ -82,6 +82,7 @@ public class NamenodeProtocolTranslatorP
     this.rpcProxy = rpcProxy;
   }
 
+  @Override
   public void close() {
     RPC.stopProxy(rpcProxy);
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Fri Oct 19 02:25:55 2012
@@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -47,7 +46,6 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeActionProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
@@ -61,7 +59,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.UpgradeCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DataEncryptionKeyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
@@ -69,6 +67,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ContentSummaryProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
@@ -94,8 +93,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.UpgradeStatusReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -103,7 +102,6 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
-import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
 import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
@@ -129,9 +127,9 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
-import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.protobuf.ByteString;
@@ -328,12 +326,15 @@ public class PBHelper {
   }
 
   public static RemoteEditLogProto convert(RemoteEditLog log) {
-    return RemoteEditLogProto.newBuilder().setEndTxId(log.getEndTxId())
-        .setStartTxId(log.getStartTxId()).build();
+    return RemoteEditLogProto.newBuilder()
+        .setStartTxId(log.getStartTxId())
+        .setEndTxId(log.getEndTxId())
+        .setIsInProgress(log.isInProgress()).build();
   }
 
   public static RemoteEditLog convert(RemoteEditLogProto l) {
-    return new RemoteEditLog(l.getStartTxId(), l.getEndTxId());
+    return new RemoteEditLog(l.getStartTxId(), l.getEndTxId(),
+        l.getIsInProgress());
   }
 
   public static RemoteEditLogManifestProto convert(
@@ -385,8 +386,8 @@ public class PBHelper {
   public static NamespaceInfo convert(NamespaceInfoProto info) {
     StorageInfoProto storage = info.getStorageInfo();
     return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(),
-        info.getBlockPoolID(), storage.getCTime(), info.getDistUpgradeVersion(),
-        info.getBuildVersion(), info.getSoftwareVersion());
+        info.getBlockPoolID(), storage.getCTime(), info.getBuildVersion(),
+        info.getSoftwareVersion());
   }
 
   public static NamenodeCommand convert(NamenodeCommandProto cmd) {
@@ -636,8 +637,6 @@ public class PBHelper {
       return PBHelper.convert(proto.getKeyUpdateCmd());
     case RegisterCommand:
       return REG_CMD;
-    case UpgradeCommand:
-      return PBHelper.convert(proto.getUpgradeCmd());
     }
     return null;
   }
@@ -734,11 +733,6 @@ public class PBHelper {
       builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).setBlkCmd(
           PBHelper.convert((BlockCommand) datanodeCommand));
       break;
-    case DatanodeProtocol.DNA_UC_ACTION_REPORT_STATUS:
-    case DatanodeProtocol.DNA_UC_ACTION_START_UPGRADE:
-      builder.setCmdType(DatanodeCommandProto.Type.UpgradeCommand)
-          .setUpgradeCmd(PBHelper.convert((UpgradeCommand) datanodeCommand));
-      break;
     case DatanodeProtocol.DNA_UNKNOWN: //Not expected
     default:
       builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand);
@@ -746,19 +740,6 @@ public class PBHelper {
     return builder.build();
   }
 
-  public static UpgradeCommand convert(UpgradeCommandProto upgradeCmd) {
-    int action = UpgradeCommand.UC_ACTION_UNKNOWN;
-    switch (upgradeCmd.getAction()) {
-    case REPORT_STATUS:
-      action = UpgradeCommand.UC_ACTION_REPORT_STATUS;
-      break;
-    case START_UPGRADE:
-      action = UpgradeCommand.UC_ACTION_START_UPGRADE;
-    }
-    return new UpgradeCommand(action, upgradeCmd.getVersion(),
-        (short) upgradeCmd.getUpgradeStatus());
-  }
-
   public static KeyUpdateCommand convert(KeyUpdateCommandProto keyUpdateCmd) {
     return new KeyUpdateCommand(PBHelper.convert(keyUpdateCmd.getKeys()));
   }
@@ -848,28 +829,6 @@ public class PBHelper {
         .build();
   }
 
-  public static UpgradeCommandProto convert(UpgradeCommand comm) {
-    UpgradeCommandProto.Builder builder = UpgradeCommandProto.newBuilder();
-    if (comm == null) {
-      return builder.setAction(UpgradeCommandProto.Action.UNKNOWN)
-          .setVersion(0).setUpgradeStatus(0).build();
-    }
-    builder.setVersion(comm.getVersion()).setUpgradeStatus(
-        comm.getCurrentStatus());
-    switch (comm.getAction()) {
-    case UpgradeCommand.UC_ACTION_REPORT_STATUS:
-      builder.setAction(UpgradeCommandProto.Action.REPORT_STATUS);
-      break;
-    case UpgradeCommand.UC_ACTION_START_UPGRADE:
-      builder.setAction(UpgradeCommandProto.Action.START_UPGRADE);
-      break;
-    default:
-      builder.setAction(UpgradeCommandProto.Action.UNKNOWN);
-      break;
-    }
-    return builder.build();
-  }
-
   public static ReceivedDeletedBlockInfo convert(
       ReceivedDeletedBlockInfoProto proto) {
     ReceivedDeletedBlockInfo.BlockStatus status = null;
@@ -894,7 +853,7 @@ public class PBHelper {
     return NamespaceInfoProto.newBuilder()
         .setBlockPoolID(info.getBlockPoolID())
         .setBuildVersion(info.getBuildVersion())
-        .setDistUpgradeVersion(info.getDistributedUpgradeVersion())
+        .setUnused(0)
         .setStorageInfo(PBHelper.convert((StorageInfo)info))
         .setSoftwareVersion(info.getSoftwareVersion()).build();
   }
@@ -970,12 +929,39 @@ public class PBHelper {
         .setIsLastBlockComplete(lb.isLastBlockComplete()).build();
   }
   
+  // DataEncryptionKey
+  public static DataEncryptionKey convert(DataEncryptionKeyProto bet) {
+    String encryptionAlgorithm = bet.getEncryptionAlgorithm();
+    return new DataEncryptionKey(bet.getKeyId(),
+        bet.getBlockPoolId(),
+        bet.getNonce().toByteArray(),
+        bet.getEncryptionKey().toByteArray(),
+        bet.getExpiryDate(),
+        encryptionAlgorithm.isEmpty() ? null : encryptionAlgorithm);
+  }
+  
+  public static DataEncryptionKeyProto convert(DataEncryptionKey bet) {
+    DataEncryptionKeyProto.Builder b = DataEncryptionKeyProto.newBuilder()
+        .setKeyId(bet.keyId)
+        .setBlockPoolId(bet.blockPoolId)
+        .setNonce(ByteString.copyFrom(bet.nonce))
+        .setEncryptionKey(ByteString.copyFrom(bet.encryptionKey))
+        .setExpiryDate(bet.expiryDate);
+    if (bet.encryptionAlgorithm != null) {
+      b.setEncryptionAlgorithm(bet.encryptionAlgorithm);
+    }
+    return b.build();
+  }
+  
   public static FsServerDefaults convert(FsServerDefaultsProto fs) {
     if (fs == null) return null;
     return new FsServerDefaults(
         fs.getBlockSize(), fs.getBytesPerChecksum(), 
         fs.getWritePacketSize(), (short) fs.getReplication(),
-        fs.getFileBufferSize());
+        fs.getFileBufferSize(),
+        fs.getEncryptDataTransfer(),
+        fs.getTrashInterval(),
+        DataChecksum.Type.valueOf(fs.getChecksumType().name()));
   }
   
   public static FsServerDefaultsProto convert(FsServerDefaults fs) {
@@ -983,7 +969,13 @@ public class PBHelper {
     return FsServerDefaultsProto.newBuilder().
       setBlockSize(fs.getBlockSize()).
       setBytesPerChecksum(fs.getBytesPerChecksum()).
-      setWritePacketSize(fs.getWritePacketSize()).setReplication(fs.getReplication()).setFileBufferSize(fs.getFileBufferSize()).build();
+      setWritePacketSize(fs.getWritePacketSize())
+      .setReplication(fs.getReplication())
+      .setFileBufferSize(fs.getFileBufferSize())
+      .setEncryptDataTransfer(fs.getEncryptDataTransfer())
+      .setTrashInterval(fs.getTrashInterval())
+      .setChecksumType(ChecksumTypeProto.valueOf(fs.getChecksumType().name()))
+      .build();
   }
   
   public static FsPermissionProto convert(FsPermission p) {
@@ -1201,51 +1193,6 @@ public class PBHelper {
     }
   }
   
-  public static UpgradeActionProto convert(
-      UpgradeAction a) {
-    switch (a) {
-    case GET_STATUS:
-      return UpgradeActionProto.GET_STATUS;
-    case DETAILED_STATUS:
-      return UpgradeActionProto.DETAILED_STATUS;
-    case FORCE_PROCEED:
-      return UpgradeActionProto.FORCE_PROCEED;
-    default:
-      throw new IllegalArgumentException("Unexpected UpgradeAction :" + a);
-    }
-  }
-  
-  
-  public static UpgradeAction convert(
-      UpgradeActionProto a) {
-    switch (a) {
-    case GET_STATUS:
-      return UpgradeAction.GET_STATUS;
-    case DETAILED_STATUS:
-      return UpgradeAction.DETAILED_STATUS;
-    case FORCE_PROCEED:
-      return UpgradeAction.FORCE_PROCEED;
-    default:
-      throw new IllegalArgumentException("Unexpected UpgradeAction :" + a);
-    }
-  }
-
-  public static UpgradeStatusReportProto convert(UpgradeStatusReport r) {
-    if (r == null)
-      return null;
-    return UpgradeStatusReportProto.newBuilder()
-        .setVersion(r.getVersion())
-        .setUpgradeStatus(r.getUpgradeStatus())
-        .setFinalized(r.isFinalized())
-        .build();
-  }
-  
-  public static UpgradeStatusReport convert(UpgradeStatusReportProto r) {
-    if (r == null) return null;
-    return new UpgradeStatusReport(r.getVersion(),
-        (short) r.getUpgradeStatus(), r.getFinalized());
-  }
-  
   public static CorruptFileBlocks convert(CorruptFileBlocksProto c) {
     if (c == null)
       return null;

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java Fri Oct 19 02:25:55 2012
@@ -119,4 +119,13 @@ public class BlockPoolTokenSecretManager
       btsm.clearAllKeysForTesting();
     }
   }
+
+  public DataEncryptionKey generateDataEncryptionKey(String blockPoolId) {
+    return get(blockPoolId).generateDataEncryptionKey();
+  }
+  
+  public byte[] retrieveDataEncryptionKey(int keyId, String blockPoolId,
+      byte[] nonce) throws IOException {
+    return get(blockPoolId).retrieveDataEncryptionKey(keyId, nonce);
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java Fri Oct 19 02:25:55 2012
@@ -118,7 +118,7 @@ public class BlockTokenIdentifier extend
     return a == null ? b == null : a.equals(b);
   }
 
-  /** {@inheritDoc} */
+  @Override
   public boolean equals(Object obj) {
     if (obj == this) {
       return true;
@@ -134,13 +134,14 @@ public class BlockTokenIdentifier extend
     return false;
   }
 
-  /** {@inheritDoc} */
+  @Override
   public int hashCode() {
     return (int) expiryDate ^ keyId ^ (int) blockId ^ modes.hashCode()
         ^ (userId == null ? 0 : userId.hashCode())
         ^ (blockPoolId == null ? 0 : blockPoolId.hashCode());
   }
 
+  @Override
   public void readFields(DataInput in) throws IOException {
     this.cache = null;
     expiryDate = WritableUtils.readVLong(in);
@@ -155,6 +156,7 @@ public class BlockTokenIdentifier extend
     }
   }
 
+  @Override
   public void write(DataOutput out) throws IOException {
     WritableUtils.writeVLong(out, expiryDate);
     WritableUtils.writeVInt(out, keyId);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java Fri Oct 19 02:25:55 2012
@@ -32,10 +32,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -73,6 +75,10 @@ public class BlockTokenSecretManager ext
   private BlockKey currentKey;
   private BlockKey nextKey;
   private Map<Integer, BlockKey> allKeys;
+  private String blockPoolId;
+  private String encryptionAlgorithm;
+  
+  private SecureRandom nonceGenerator = new SecureRandom();
 
   public static enum AccessMode {
     READ, WRITE, COPY, REPLACE
@@ -85,8 +91,9 @@ public class BlockTokenSecretManager ext
    * @param tokenLifetime how long an individual token is valid
    */
   public BlockTokenSecretManager(long keyUpdateInterval,
-      long tokenLifetime) {
-    this(false, keyUpdateInterval, tokenLifetime);
+      long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
+    this(false, keyUpdateInterval, tokenLifetime, blockPoolId,
+        encryptionAlgorithm);
   }
   
   /**
@@ -99,8 +106,10 @@ public class BlockTokenSecretManager ext
    * @param otherNnId the NN ID of the other NN in an HA setup
    */
   public BlockTokenSecretManager(long keyUpdateInterval,
-      long tokenLifetime, int nnIndex) {
-    this(true, keyUpdateInterval, tokenLifetime);
+      long tokenLifetime, int nnIndex, String blockPoolId,
+      String encryptionAlgorithm) {
+    this(true, keyUpdateInterval, tokenLifetime, blockPoolId,
+        encryptionAlgorithm);
     Preconditions.checkArgument(nnIndex == 0 || nnIndex == 1);
     this.nnIndex = nnIndex;
     setSerialNo(new SecureRandom().nextInt());
@@ -108,17 +117,24 @@ public class BlockTokenSecretManager ext
   }
   
   private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
-      long tokenLifetime) {
+      long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
     this.isMaster = isMaster;
     this.keyUpdateInterval = keyUpdateInterval;
     this.tokenLifetime = tokenLifetime;
     this.allKeys = new HashMap<Integer, BlockKey>();
+    this.blockPoolId = blockPoolId;
+    this.encryptionAlgorithm = encryptionAlgorithm;
+    generateKeys();
   }
   
   @VisibleForTesting
   public synchronized void setSerialNo(int serialNo) {
     this.serialNo = (serialNo & LOW_MASK) | (nnIndex << 31);
   }
+  
+  public void setBlockPoolId(String blockPoolId) {
+    this.blockPoolId = blockPoolId;
+  }
 
   /** Initialize block keys */
   private synchronized void generateKeys() {
@@ -137,10 +153,10 @@ public class BlockTokenSecretManager ext
      * more.
      */
     setSerialNo(serialNo + 1);
-    currentKey = new BlockKey(serialNo, System.currentTimeMillis() + 2
+    currentKey = new BlockKey(serialNo, Time.now() + 2
         * keyUpdateInterval + tokenLifetime, generateSecret());
     setSerialNo(serialNo + 1);
-    nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3
+    nextKey = new BlockKey(serialNo, Time.now() + 3
         * keyUpdateInterval + tokenLifetime, generateSecret());
     allKeys.put(currentKey.getKeyId(), currentKey);
     allKeys.put(nextKey.getKeyId(), nextKey);
@@ -157,7 +173,7 @@ public class BlockTokenSecretManager ext
   }
 
   private synchronized void removeExpiredKeys() {
-    long now = System.currentTimeMillis();
+    long now = Time.now();
     for (Iterator<Map.Entry<Integer, BlockKey>> it = allKeys.entrySet()
         .iterator(); it.hasNext();) {
       Map.Entry<Integer, BlockKey> e = it.next();
@@ -207,15 +223,15 @@ public class BlockTokenSecretManager ext
     removeExpiredKeys();
     // set final expiry date of retiring currentKey
     allKeys.put(currentKey.getKeyId(), new BlockKey(currentKey.getKeyId(),
-        System.currentTimeMillis() + keyUpdateInterval + tokenLifetime,
+        Time.now() + keyUpdateInterval + tokenLifetime,
         currentKey.getKey()));
     // update the estimated expiry date of new currentKey
-    currentKey = new BlockKey(nextKey.getKeyId(), System.currentTimeMillis()
+    currentKey = new BlockKey(nextKey.getKeyId(), Time.now()
         + 2 * keyUpdateInterval + tokenLifetime, nextKey.getKey());
     allKeys.put(currentKey.getKeyId(), currentKey);
     // generate a new nextKey
     setSerialNo(serialNo + 1);
-    nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3
+    nextKey = new BlockKey(serialNo, Time.now() + 3
         * keyUpdateInterval + tokenLifetime, generateSecret());
     allKeys.put(nextKey.getKeyId(), nextKey);
     return true;
@@ -290,7 +306,7 @@ public class BlockTokenSecretManager ext
   }
 
   private static boolean isExpired(long expiryDate) {
-    return System.currentTimeMillis() > expiryDate;
+    return Time.now() > expiryDate;
   }
 
   /**
@@ -335,7 +351,7 @@ public class BlockTokenSecretManager ext
     }
     if (key == null)
       throw new IllegalStateException("currentKey hasn't been initialized.");
-    identifier.setExpiryDate(System.currentTimeMillis() + tokenLifetime);
+    identifier.setExpiryDate(Time.now() + tokenLifetime);
     identifier.setKeyId(key.getKeyId());
     if (LOG.isDebugEnabled()) {
       LOG.debug("Generating block token for " + identifier.toString());
@@ -370,6 +386,49 @@ public class BlockTokenSecretManager ext
     return createPassword(identifier.getBytes(), key.getKey());
   }
   
+  /**
+   * Generate a data encryption key for this block pool, using the current
+   * BlockKey.
+   * 
+   * @return a data encryption key which may be used to encrypt traffic
+   *         over the DataTransferProtocol
+   */
+  public DataEncryptionKey generateDataEncryptionKey() {
+    byte[] nonce = new byte[8];
+    nonceGenerator.nextBytes(nonce);
+    BlockKey key = null;
+    synchronized (this) {
+      key = currentKey;
+    }
+    byte[] encryptionKey = createPassword(nonce, key.getKey());
+    return new DataEncryptionKey(key.getKeyId(), blockPoolId, nonce,
+        encryptionKey, Time.now() + tokenLifetime,
+        encryptionAlgorithm);
+  }
+  
+  /**
+   * Recreate an encryption key based on the given key id and nonce.
+   * 
+   * @param keyId identifier of the secret key used to generate the encryption key.
+   * @param nonce random value used to create the encryption key
+   * @return the encryption key which corresponds to this (keyId, blockPoolId, nonce)
+   * @throws InvalidToken
+   * @throws InvalidEncryptionKeyException 
+   */
+  public byte[] retrieveDataEncryptionKey(int keyId, byte[] nonce)
+      throws InvalidEncryptionKeyException {
+    BlockKey key = null;
+    synchronized (this) {
+      key = allKeys.get(keyId);
+      if (key == null) {
+        throw new InvalidEncryptionKeyException("Can't re-compute encryption key"
+            + " for nonce, since the required block key (keyID=" + keyId
+            + ") doesn't exist. Current key: " + currentKey.getKeyId());
+      }
+    }
+    return createPassword(nonce, key.getKey());
+  }
+  
   @VisibleForTesting
   public synchronized void setKeyUpdateIntervalForTesting(long millis) {
     this.keyUpdateInterval = millis;

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java Fri Oct 19 02:25:55 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.security.token.
 @InterfaceAudience.Private
 public class BlockTokenSelector implements TokenSelector<BlockTokenIdentifier> {
 
+  @Override
   @SuppressWarnings("unchecked")
   public Token<BlockTokenIdentifier> selectToken(Text service,
       Collection<Token<? extends TokenIdentifier>> tokens) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java Fri Oct 19 02:25:55 2012
@@ -78,6 +78,7 @@ public class ExportedBlockKeys implement
   static { // register a ctor
     WritableFactories.setFactory(ExportedBlockKeys.class,
         new WritableFactory() {
+          @Override
           public Writable newInstance() {
             return new ExportedBlockKeys();
           }
@@ -86,6 +87,7 @@ public class ExportedBlockKeys implement
 
   /**
    */
+  @Override
   public void write(DataOutput out) throws IOException {
     out.writeBoolean(isBlockTokenEnabled);
     out.writeLong(keyUpdateInterval);
@@ -99,6 +101,7 @@ public class ExportedBlockKeys implement
 
   /**
    */
+  @Override
   public void readFields(DataInput in) throws IOException {
     isBlockTokenEnabled = in.readBoolean();
     keyUpdateInterval = in.readLong();

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Oct 19 02:25:55 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.balancer;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
 
 import java.io.BufferedInputStream;
@@ -24,6 +25,9 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
 import java.net.Socket;
 import java.net.URI;
 import java.text.DateFormat;
@@ -57,6 +61,8 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
@@ -64,7 +70,6 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.io.IOUtils;
@@ -72,9 +77,9 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import static com.google.common.base.Preconditions.checkArgument;
 
 /** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
  * when some datanodes become full or when new empty nodes join the cluster.
@@ -184,6 +189,13 @@ public class Balancer {
    */
   public static final int MAX_NUM_CONCURRENT_MOVES = 5;
   
+  private static final String USAGE = "Usage: java "
+      + Balancer.class.getSimpleName()
+      + "\n\t[-policy <policy>]\tthe balancing policy: "
+      + BalancingPolicy.Node.INSTANCE.getName() + " or "
+      + BalancingPolicy.Pool.INSTANCE.getName()
+      + "\n\t[-threshold <threshold>]\tPercentage of disk capacity";
+  
   private final NameNodeConnector nnc;
   private final BalancingPolicy policy;
   private final double threshold;
@@ -311,11 +323,22 @@ public class Balancer {
             NetUtils.createSocketAddr(target.datanode.getXferAddr()),
             HdfsServerConstants.READ_TIMEOUT);
         sock.setKeepAlive(true);
-        out = new DataOutputStream( new BufferedOutputStream(
-            sock.getOutputStream(), HdfsConstants.IO_FILE_BUFFER_SIZE));
+        
+        OutputStream unbufOut = sock.getOutputStream();
+        InputStream unbufIn = sock.getInputStream();
+        if (nnc.getDataEncryptionKey() != null) {
+          IOStreamPair encryptedStreams =
+              DataTransferEncryptor.getEncryptedStreams(
+                  unbufOut, unbufIn, nnc.getDataEncryptionKey());
+          unbufOut = encryptedStreams.out;
+          unbufIn = encryptedStreams.in;
+        }
+        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+            HdfsConstants.IO_FILE_BUFFER_SIZE));
+        in = new DataInputStream(new BufferedInputStream(unbufIn,
+            HdfsConstants.IO_FILE_BUFFER_SIZE));
+        
         sendRequest(out);
-        in = new DataInputStream( new BufferedInputStream(
-            sock.getInputStream(), HdfsConstants.IO_FILE_BUFFER_SIZE));
         receiveResponse(in);
         bytesMoved.inc(block.getNumBytes());
         LOG.info( "Moving block " + block.getBlock().getBlockId() +
@@ -377,6 +400,7 @@ public class Balancer {
     /* start a thread to dispatch the block move */
     private void scheduleBlockMove() {
       moverExecutor.execute(new Runnable() {
+        @Override
         public void run() {
           if (LOG.isDebugEnabled()) {
             LOG.debug("Starting moving "+ block.getBlockId() +
@@ -569,6 +593,7 @@ public class Balancer {
     /* A thread that initiates a block move 
      * and waits for block move to complete */
     private class BlockMoveDispatcher implements Runnable {
+      @Override
       public void run() {
         dispatchBlocks();
       }
@@ -710,7 +735,7 @@ public class Balancer {
      */ 
     private static final long MAX_ITERATION_TIME = 20*60*1000L; //20 mins
     private void dispatchBlocks() {
-      long startTime = Util.now();
+      long startTime = Time.now();
       this.blocksToReceive = 2*scheduledSize;
       boolean isTimeUp = false;
       while(!isTimeUp && scheduledSize>0 &&
@@ -739,7 +764,7 @@ public class Balancer {
         } 
         
         // check if time is up or not
-        if (Util.now()-startTime > MAX_ITERATION_TIME) {
+        if (Time.now()-startTime > MAX_ITERATION_TIME) {
           isTimeUp = true;
           continue;
         }
@@ -1144,7 +1169,7 @@ public class Balancer {
    * move blocks in current window to old window.
    */ 
   private static class MovedBlocks {
-    private long lastCleanupTime = System.currentTimeMillis();
+    private long lastCleanupTime = Time.now();
     final private static int CUR_WIN = 0;
     final private static int OLD_WIN = 1;
     final private static int NUM_WINS = 2;
@@ -1175,7 +1200,7 @@ public class Balancer {
 
     /* remove old blocks */
     synchronized private void cleanup() {
-      long curTime = System.currentTimeMillis();
+      long curTime = Time.now();
       // check if old win is older than winWidth
       if (lastCleanupTime + WIN_WIDTH <= curTime) {
         // purge the old window
@@ -1472,7 +1497,7 @@ public class Balancer {
     /** Parse arguments and then run Balancer */
     @Override
     public int run(String[] args) {
-      final long startTime = Util.now();
+      final long startTime = Time.now();
       final Configuration conf = getConf();
       WIN_WIDTH = conf.getLong(
           DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 
@@ -1490,7 +1515,7 @@ public class Balancer {
         System.out.println(e + ".  Exiting ...");
         return ReturnStatus.INTERRUPTED.code;
       } finally {
-        System.out.println("Balancing took " + time2Str(Util.now()-startTime));
+        System.out.println("Balancing took " + time2Str(Time.now()-startTime));
       }
     }
 
@@ -1532,7 +1557,7 @@ public class Balancer {
             }
           }
         } catch(RuntimeException e) {
-          printUsage();
+          printUsage(System.err);
           throw e;
         }
       }
@@ -1540,13 +1565,8 @@ public class Balancer {
       return new Parameters(policy, threshold);
     }
 
-    private static void printUsage() {
-      System.out.println("Usage: java " + Balancer.class.getSimpleName());
-      System.out.println("    [-policy <policy>]\tthe balancing policy: "
-          + BalancingPolicy.Node.INSTANCE.getName() + " or " 
-          + BalancingPolicy.Pool.INSTANCE.getName());
-      System.out.println(
-          "    [-threshold <threshold>]\tPercentage of disk capacity");
+    private static void printUsage(PrintStream out) {
+      out.println(USAGE + "\n");
     }
   }
 
@@ -1555,6 +1575,10 @@ public class Balancer {
    * @param args Command line arguments
    */
   public static void main(String[] args) {
+    if (DFSUtil.parseHelpArgument(args, USAGE, System.out, true)) {
+      System.exit(0);
+    }
+
     try {
       System.exit(ToolRunner.run(new HdfsConfiguration(), new Cli(), args));
     } catch (Throwable e) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Fri Oct 19 02:25:55 2012
@@ -29,10 +29,12 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -60,10 +62,12 @@ class NameNodeConnector {
   final OutputStream out;
 
   private final boolean isBlockTokenEnabled;
+  private final boolean encryptDataTransfer;
   private boolean shouldRun;
   private long keyUpdaterInterval;
   private BlockTokenSecretManager blockTokenSecretManager;
   private Daemon keyupdaterthread; // AccessKeyUpdater thread
+  private DataEncryptionKey encryptionKey;
 
   NameNodeConnector(URI nameNodeUri,
       Configuration conf) throws IOException {
@@ -88,8 +92,11 @@ class NameNodeConnector {
       LOG.info("Block token params received from NN: keyUpdateInterval="
           + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
           + blockTokenLifetime / (60 * 1000) + " min(s)");
+      String encryptionAlgorithm = conf.get(
+          DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
       this.blockTokenSecretManager = new BlockTokenSecretManager(
-          blockKeyUpdateInterval, blockTokenLifetime);
+          blockKeyUpdateInterval, blockTokenLifetime, blockpoolID,
+          encryptionAlgorithm);
       this.blockTokenSecretManager.addKeys(keys);
       /*
        * Balancer should sync its block keys with NN more frequently than NN
@@ -102,7 +109,8 @@ class NameNodeConnector {
       this.shouldRun = true;
       this.keyupdaterthread.start();
     }
-
+    this.encryptDataTransfer = fs.getServerDefaults(new Path("/"))
+        .getEncryptDataTransfer();
     // Check if there is another balancer running.
     // Exit if there is another one running.
     out = checkAndMarkRunningBalancer(); 
@@ -126,6 +134,20 @@ class NameNodeConnector {
           BlockTokenSecretManager.AccessMode.COPY));
     }
   }
+  
+  DataEncryptionKey getDataEncryptionKey()
+      throws IOException {
+    if (encryptDataTransfer) {
+      synchronized (this) {
+        if (encryptionKey == null) {
+          encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
+        }
+        return encryptionKey;
+      }
+    } else {
+      return null;
+    }
+  }
 
   /* The idea for making sure that there is no more than one balancer
    * running in an HDFS is to create a file in the HDFS, writes the IP address
@@ -189,6 +211,7 @@ class NameNodeConnector {
    * Periodically updates access keys.
    */
   class BlockKeyUpdater implements Runnable {
+    @Override
     public void run() {
       try {
         while (shouldRun) {
@@ -207,4 +230,4 @@ class NameNodeConnector {
       }
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java Fri Oct 19 02:25:55 2012
@@ -19,12 +19,14 @@ package org.apache.hadoop.hdfs.server.bl
 
 import java.io.IOException;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ContentSummary;
 
 /** 
  * This interface is used by the block manager to expose a
  * few characteristics of a collection of Block/BlockUnderConstruction.
  */
+@InterfaceAudience.Private
 public interface BlockCollection {
   /**
    * Get the last block of the collection.
@@ -56,7 +58,7 @@ public interface BlockCollection {
    * Get block replication for the collection 
    * @return block replication value
    */
-  public short getReplication();
+  public short getBlockReplication();
 
   /**
    * Get the name of the collection.

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Fri Oct 19 02:25:55 2012
@@ -73,7 +73,7 @@ public class BlockInfo extends Block imp
    * @param from BlockInfo to copy from.
    */
   protected BlockInfo(BlockInfo from) {
-    this(from, from.bc.getReplication());
+    this(from, from.bc.getBlockReplication());
     this.bc = from.bc;
   }
 
@@ -335,7 +335,7 @@ public class BlockInfo extends Block imp
       BlockUCState s, DatanodeDescriptor[] targets) {
     if(isComplete()) {
       return new BlockInfoUnderConstruction(
-          this, getBlockCollection().getReplication(), s, targets);
+          this, getBlockCollection().getBlockReplication(), s, targets);
     }
     // the block is already under construction
     BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)this;

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Oct 19 02:25:55 2012
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -58,7 +59,6 @@ import static org.apache.hadoop.util.Exi
 
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -71,6 +71,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -204,6 +205,17 @@ public class BlockManager {
 
   /** variable to enable check for enough racks */
   final boolean shouldCheckForEnoughRacks;
+  
+  // whether or not to issue block encryption keys.
+  final boolean encryptDataTransfer;
+
+  /**
+   * When running inside a Standby node, the node may receive block reports
+   * from datanodes before receiving the corresponding namespace edits from
+   * the active NameNode. Thus, it will postpone them for later processing,
+   * instead of marking the blocks as corrupt.
+   */
+  private boolean shouldPostponeBlocksFromFuture = false;
 
   /** for block replicas placement */
   private BlockPlacementPolicy blockplacement;
@@ -255,33 +267,24 @@ public class BlockManager {
                                              DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
     this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) != null;
 
-    this.blocksInvalidateWorkPct = conf.getFloat(
-        DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION,
-        DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION_DEFAULT);
-    Preconditions.checkArgument(
-        (this.blocksInvalidateWorkPct > 0),
-        DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION +
-        " = '" + this.blocksInvalidateWorkPct + "' is invalid. " +
-        "It should be a positive, non-zero float value " +
-        "indicating a percentage.");
-    this.blocksReplWorkMultiplier = conf.getInt(
-        DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
-        DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT);
-    Preconditions.checkArgument(
-        (this.blocksReplWorkMultiplier > 0),
-        DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION +
-        " = '" + this.blocksReplWorkMultiplier + "' is invalid. " +
-        "It should be a positive, non-zero integer value.");
+    this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
+    this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
 
     this.replicationRecheckInterval = 
       conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
                   DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
+    
+    this.encryptDataTransfer =
+        conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
+            DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
+    
     LOG.info("defaultReplication         = " + defaultReplication);
     LOG.info("maxReplication             = " + maxReplication);
     LOG.info("minReplication             = " + minReplication);
     LOG.info("maxReplicationStreams      = " + maxReplicationStreams);
     LOG.info("shouldCheckForEnoughRacks  = " + shouldCheckForEnoughRacks);
     LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
+    LOG.info("encryptDataTransfer        = " + encryptDataTransfer);
   }
 
   private static BlockTokenSecretManager createBlockTokenSecretManager(
@@ -301,10 +304,14 @@ public class BlockManager {
     final long lifetimeMin = conf.getLong(
         DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY, 
         DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT);
+    final String encryptionAlgorithm = conf.get(
+        DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
     LOG.info(DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY
         + "=" + updateMin + " min(s), "
         + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY
-        + "=" + lifetimeMin + " min(s)");
+        + "=" + lifetimeMin + " min(s), "
+        + DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY
+        + "=" + encryptionAlgorithm);
     
     String nsId = DFSUtil.getNamenodeNameServiceId(conf);
     boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId);
@@ -313,10 +320,17 @@ public class BlockManager {
       String thisNnId = HAUtil.getNameNodeId(conf, nsId);
       String otherNnId = HAUtil.getNameNodeIdOfOtherNode(conf, nsId);
       return new BlockTokenSecretManager(updateMin*60*1000L,
-          lifetimeMin*60*1000L, thisNnId.compareTo(otherNnId) < 0 ? 0 : 1);
+          lifetimeMin*60*1000L, thisNnId.compareTo(otherNnId) < 0 ? 0 : 1, null,
+          encryptionAlgorithm);
     } else {
       return new BlockTokenSecretManager(updateMin*60*1000L,
-          lifetimeMin*60*1000L, 0);
+          lifetimeMin*60*1000L, 0, null, encryptionAlgorithm);
+    }
+  }
+  
+  public void setBlockPoolId(String blockPoolId) {
+    if (isBlockTokenEnabled()) {
+      blockTokenSecretManager.setBlockPoolId(blockPoolId);
     }
   }
 
@@ -343,10 +357,16 @@ public class BlockManager {
   }
 
   public void close() {
-    if (pendingReplications != null) pendingReplications.stop();
-    blocksMap.close();
+    try {
+      if (replicationThread != null) {
+        replicationThread.interrupt();
+        replicationThread.join(3000);
+      }
+    } catch (InterruptedException ie) {
+    }
     datanodeManager.close();
-    if (replicationThread != null) replicationThread.interrupt();
+    pendingReplications.stop();
+    blocksMap.close();
   }
 
   /** @return the datanodeManager */
@@ -783,6 +803,14 @@ public class BlockManager {
       nodeinfo.needKeyUpdate = false;
     }
   }
+  
+  public DataEncryptionKey generateDataEncryptionKey() {
+    if (isBlockTokenEnabled() && encryptDataTransfer) {
+      return blockTokenSecretManager.generateDataEncryptionKey();
+    } else {
+      return null;
+    }
+  }
 
   /**
    * Clamp the specified replication between the minimum and the maximum
@@ -969,7 +997,7 @@ public class BlockManager {
 
     // Add this replica to corruptReplicas Map
     corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason);
-    if (countNodes(b.stored).liveReplicas() >= bc.getReplication()) {
+    if (countNodes(b.stored).liveReplicas() >= bc.getBlockReplication()) {
       // the block is over-replicated so invalidate the replicas immediately
       invalidateBlock(b, node);
     } else if (namesystem.isPopulatingReplQueues()) {
@@ -1013,6 +1041,12 @@ public class BlockManager {
     }
   }
 
+
+  public void setPostponeBlocksFromFuture(boolean postpone) {
+    this.shouldPostponeBlocksFromFuture  = postpone;
+  }
+
+
   private void postponeBlock(Block blk) {
     if (postponedMisreplicatedBlocks.add(blk)) {
       postponedMisreplicatedBlocksCount++;
@@ -1101,7 +1135,7 @@ public class BlockManager {
               continue;
             }
 
-            requiredReplication = bc.getReplication();
+            requiredReplication = bc.getBlockReplication();
 
             // get a source data-node
             containingNodes = new ArrayList<DatanodeDescriptor>();
@@ -1187,7 +1221,7 @@ public class BlockManager {
             neededReplications.decrementReplicationIndex(priority);
             continue;
           }
-          requiredReplication = bc.getReplication();
+          requiredReplication = bc.getBlockReplication();
 
           // do not schedule more if enough replicas is already pending
           NumberReplicas numReplicas = countNodes(block);
@@ -1280,8 +1314,9 @@ public class BlockManager {
       final HashMap<Node, Node> excludedNodes,
       final long blocksize) throws IOException {
     // choose targets for the new block to be allocated.
-    final DatanodeDescriptor targets[] = blockplacement.chooseTarget(
-        src, numOfReplicas, client, excludedNodes, blocksize);
+    final DatanodeDescriptor targets[] = blockplacement.chooseTarget(src,
+        numOfReplicas, client, new ArrayList<DatanodeDescriptor>(), false,
+        excludedNodes, blocksize);
     if (targets.length < minReplication) {
       throw new IOException("File " + src + " could only be replicated to "
           + targets.length + " nodes instead of minReplication (="
@@ -1456,7 +1491,7 @@ public class BlockManager {
   public void processReport(final DatanodeID nodeID, final String poolId,
       final BlockListAsLongs newReport) throws IOException {
     namesystem.writeLock();
-    final long startTime = Util.now(); //after acquiring write lock
+    final long startTime = Time.now(); //after acquiring write lock
     final long endTime;
     try {
       final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
@@ -1495,7 +1530,7 @@ public class BlockManager {
       }
       
     } finally {
-      endTime = Util.now();
+      endTime = Time.now();
       namesystem.writeUnlock();
     }
 
@@ -1589,13 +1624,11 @@ public class BlockManager {
     assert (node.numBlocks() == 0);
     BlockReportIterator itBR = report.getBlockReportIterator();
 
-    boolean isStandby = namesystem.isInStandbyState();
-    
     while(itBR.hasNext()) {
       Block iblk = itBR.next();
       ReplicaState reportedState = itBR.getCurrentReplicaState();
       
-      if (isStandby &&
+      if (shouldPostponeBlocksFromFuture &&
           namesystem.isGenStampInFuture(iblk.getGenerationStamp())) {
         queueReportedBlock(node, iblk, reportedState,
             QUEUE_REASON_FUTURE_GENSTAMP);
@@ -1611,7 +1644,7 @@ public class BlockManager {
       BlockToMarkCorrupt c = checkReplicaCorrupt(
           iblk, reportedState, storedBlock, ucState, node);
       if (c != null) {
-        if (namesystem.isInStandbyState()) {
+        if (shouldPostponeBlocksFromFuture) {
           // In the Standby, we may receive a block report for a file that we
           // just have an out-of-date gen-stamp or state for, for example.
           queueReportedBlock(node, iblk, reportedState,
@@ -1717,7 +1750,7 @@ public class BlockManager {
           + " replicaState = " + reportedState);
     }
   
-    if (namesystem.isInStandbyState() &&
+    if (shouldPostponeBlocksFromFuture &&
         namesystem.isGenStampInFuture(block.getGenerationStamp())) {
       queueReportedBlock(dn, block, reportedState,
           QUEUE_REASON_FUTURE_GENSTAMP);
@@ -1750,7 +1783,7 @@ assert storedBlock.findDatanode(dn) < 0 
     BlockToMarkCorrupt c = checkReplicaCorrupt(
         block, reportedState, storedBlock, ucState, dn);
     if (c != null) {
-      if (namesystem.isInStandbyState()) {
+      if (shouldPostponeBlocksFromFuture) {
         // If the block is an out-of-date generation stamp or state,
         // but we're the standby, we shouldn't treat it as corrupt,
         // but instead just queue it for later processing.
@@ -1783,7 +1816,7 @@ assert storedBlock.findDatanode(dn) < 0 
    */
   private void queueReportedBlock(DatanodeDescriptor dn, Block block,
       ReplicaState reportedState, String reason) {
-    assert namesystem.isInStandbyState();
+    assert shouldPostponeBlocksFromFuture;
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("Queueing reported block " + block +
@@ -1826,9 +1859,9 @@ assert storedBlock.findDatanode(dn) < 0 
    * with the namespace information.
    */
   public void processAllPendingDNMessages() throws IOException {
-    assert !namesystem.isInStandbyState() :
-      "processAllPendingDNMessages() should be called after exiting " +
-      "standby state!";
+    assert !shouldPostponeBlocksFromFuture :
+      "processAllPendingDNMessages() should be called after disabling " +
+      "block postponement.";
     int count = pendingDNMessages.count();
     if (count > 0) {
       LOG.info("Processing " + count + " messages from DataNodes " +
@@ -2056,7 +2089,7 @@ assert storedBlock.findDatanode(dn) < 0 
     }
 
     // handle underReplication/overReplication
-    short fileReplication = bc.getReplication();
+    short fileReplication = bc.getBlockReplication();
     if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
       neededReplications.remove(storedBlock, numCurrentReplica,
           num.decommissionedReplicas(), fileReplication);
@@ -2195,7 +2228,7 @@ assert storedBlock.findDatanode(dn) < 0 
       return MisReplicationResult.UNDER_CONSTRUCTION;
     }
     // calculate current replication
-    short expectedReplication = bc.getReplication();
+    short expectedReplication = bc.getBlockReplication();
     NumberReplicas num = countNodes(block);
     int numCurrentReplica = num.liveReplicas();
     // add to under-replicated queue if need to be
@@ -2666,7 +2699,7 @@ assert storedBlock.findDatanode(dn) < 0 
     while(it.hasNext()) {
       final Block block = it.next();
       BlockCollection bc = blocksMap.getBlockCollection(block);
-      short expectedReplication = bc.getReplication();
+      short expectedReplication = bc.getBlockReplication();
       NumberReplicas num = countNodes(block);
       int numCurrentReplica = num.liveReplicas();
       if (numCurrentReplica > expectedReplication) {
@@ -2812,7 +2845,7 @@ assert storedBlock.findDatanode(dn) < 0 
     if (bc == null) { // block does not belong to any file
       return 0;
     }
-    return bc.getReplication();
+    return bc.getBlockReplication();
   }
 
 
@@ -2823,6 +2856,9 @@ assert storedBlock.findDatanode(dn) < 0 
    * @return number of blocks scheduled for removal during this iteration.
    */
   private int invalidateWorkForOneNode(String nodeId) {
+    final List<Block> toInvalidate;
+    final DatanodeDescriptor dn;
+    
     namesystem.writeLock();
     try {
       // blocks should not be replicated or removed if safe mode is on
@@ -2832,10 +2868,23 @@ assert storedBlock.findDatanode(dn) < 0 
       }
       // get blocks to invalidate for the nodeId
       assert nodeId != null;
-      return invalidateBlocks.invalidateWork(nodeId);
+      dn = datanodeManager.getDatanode(nodeId);
+      if (dn == null) {
+        invalidateBlocks.remove(nodeId);
+        return 0;
+      }
+      toInvalidate = invalidateBlocks.invalidateWork(nodeId, dn);
+      if (toInvalidate == null) {
+        return 0;
+      }
     } finally {
       namesystem.writeUnlock();
     }
+    if (NameNode.stateChangeLog.isInfoEnabled()) {
+      NameNode.stateChangeLog.info("BLOCK* " + getClass().getSimpleName()
+          + ": ask " + dn + " to delete " + toInvalidate);
+    }
+    return toInvalidate.size();
   }
 
   boolean blockHasEnoughRacks(Block b) {
@@ -2966,7 +3015,7 @@ assert storedBlock.findDatanode(dn) < 0 
           break;
         } catch (Throwable t) {
           LOG.fatal("ReplicationMonitor thread received Runtime exception. ", t);
-          terminate(1);
+          terminate(1, t);
         }
       }
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Fri Oct 19 02:25:55 2012
@@ -71,21 +71,6 @@ public abstract class BlockPlacementPoli
                                              long blocksize);
 
   /**
-   * Same as
-   * {{@link #chooseTarget(String, int, DatanodeDescriptor, List, boolean, HashMap, long)}
-   * with returnChosenNodes equal to false.
-   */
-  final DatanodeDescriptor[] chooseTarget(String srcPath,
-                                          int numOfReplicas,
-                                          DatanodeDescriptor writer,
-                                          List<DatanodeDescriptor> chosenNodes,
-                                          HashMap<Node, Node> excludedNodes,
-                                          long blocksize) {
-    return chooseTarget(srcPath, numOfReplicas, writer, chosenNodes, false,
-        excludedNodes, blocksize);
-  }
-
-  /**
    * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
    * to re-replicate a block with size <i>blocksize</i> 
    * If not, return as many as we can.
@@ -131,7 +116,7 @@ public abstract class BlockPlacementPoli
                                     HashMap<Node, Node> excludedNodes,
                                     long blocksize) {
     return chooseTarget(srcBC.getName(), numOfReplicas, writer,
-                        chosenNodes, excludedNodes, blocksize);
+                        chosenNodes, false, excludedNodes, blocksize);
   }
 
   /**
@@ -198,51 +183,6 @@ public abstract class BlockPlacementPoli
     replicator.initialize(conf, stats, clusterMap);
     return replicator;
   }
-
-  /**
-   * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate
-   * a block with size <i>blocksize</i> 
-   * If not, return as many as we can.
-   * 
-   * @param srcPath a string representation of the file for which chooseTarget is invoked
-   * @param numOfReplicas number of replicas wanted.
-   * @param writer the writer's machine, null if not in the cluster.
-   * @param blocksize size of the data to be written.
-   * @return array of DatanodeDescriptor instances chosen as targets
-   * and sorted as a pipeline.
-   */
-  DatanodeDescriptor[] chooseTarget(String srcPath,
-                                    int numOfReplicas,
-                                    DatanodeDescriptor writer,
-                                    long blocksize) {
-    return chooseTarget(srcPath, numOfReplicas, writer,
-                        new ArrayList<DatanodeDescriptor>(),
-                        blocksize);
-  }
-
-  /**
-   * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate
-   * a block with size <i>blocksize</i>
-   * If not, return as many as we can.
-   *
-   * @param srcPath a string representation of the file for which chooseTarget is invoked
-   * @param numOfReplicas number of replicas wanted.
-   * @param writer the writer's machine, null if not in the cluster.
-   * @param blocksize size of the data to be written.
-   * @param excludedNodes datanodes that should not be considered as targets.
-   * @return array of DatanodeDescriptor instances chosen as targets
-   * and sorted as a pipeline.
-   */
-  public DatanodeDescriptor[] chooseTarget(String srcPath,
-                                    int numOfReplicas,
-                                    DatanodeDescriptor writer,
-                                    HashMap<Node, Node> excludedNodes,
-                                    long blocksize) {
-    return chooseTarget(srcPath, numOfReplicas, writer,
-                        new ArrayList<DatanodeDescriptor>(),
-                        excludedNodes,
-                        blocksize);
-  }
   
   /**
    * Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur.



Mime
View raw message