hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1196458 [2/9] - in /hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/bin/ src/main/java/ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ ...
Date Wed, 02 Nov 2011 05:35:26 GMT
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Wed Nov  2 05:34:31 2011
@@ -388,6 +388,8 @@ public class DFSInputStream extends FSIn
     DatanodeInfo chosenNode = null;
     int refetchToken = 1; // only need to get a new access token once
     
+    boolean connectFailedOnce = false;
+
     while (true) {
       //
       // Compute desired block
@@ -409,6 +411,10 @@ public class DFSInputStream extends FSIn
             accessToken,
             offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
             buffersize, verifyChecksum, dfsClient.clientName);
+        if(connectFailedOnce) {
+          DFSClient.LOG.info("Successfully connected to " + targetAddr +
+                             " for block " + blk.getBlockId());
+        }
         return chosenNode;
       } catch (IOException ex) {
         if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
@@ -428,11 +434,9 @@ public class DFSInputStream extends FSIn
           refetchToken--;
           fetchBlockAt(target);
         } else {
-          DFSClient.LOG.warn("Failed to connect to " + targetAddr
-              + ", add to deadNodes and continue " + ex);
-          if (DFSClient.LOG.isDebugEnabled()) {
-            DFSClient.LOG.debug("Connection failure ", ex);
-          }
+          connectFailedOnce = true;
+          DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
+            + ", add to deadNodes and continue. " + ex, ex);
           // Put chosen node into dead list, continue
           addToDeadNodes(chosenNode);
         }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Wed Nov  2 05:34:31 2011
@@ -1033,9 +1033,7 @@ class DFSOutputStream extends FSOutputSu
         // send the request
         new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
             nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, 
-            nodes.length, block.getNumBytes(), bytesSent, newGS);
-        checksum.writeHeader(out);
-        out.flush();
+            nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
 
         // receive ack for connect
         BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Wed Nov  2 05:34:31 2011
@@ -22,36 +22,29 @@ import static org.apache.hadoop.hdfs.DFS
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 import java.util.StringTokenizer;
-import java.util.concurrent.TimeUnit;
+
+import javax.net.SocketFactory;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryProxy;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -688,84 +681,38 @@ public class DFSUtil {
 
 
   /** Create a {@link NameNode} proxy */
-  public static ClientProtocol createNamenode(Configuration conf) throws IOException {
+  public static ClientProtocol createNamenode(Configuration conf)
+      throws IOException {
     return createNamenode(NameNode.getAddress(conf), conf);
   }
 
   /** Create a {@link NameNode} proxy */
   public static ClientProtocol createNamenode(InetSocketAddress nameNodeAddr,
-      Configuration conf) throws IOException {
-    return createNamenode(nameNodeAddr, conf, UserGroupInformation.getCurrentUser());
-  }
-  
-  /** Create a {@link NameNode} proxy */
-  public static ClientProtocol createNamenode(InetSocketAddress nameNodeAddr,
-      Configuration conf, UserGroupInformation ugi) throws IOException {
-    return createNamenode(createRPCNamenode(nameNodeAddr, conf, ugi));
-  }
-
-  /** Create a {@link NameNode} proxy */
-  public static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr,
-      Configuration conf, UserGroupInformation ugi) 
-    throws IOException {
-    return (ClientProtocol)RPC.getProxy(ClientProtocol.class,
-        ClientProtocol.versionID, nameNodeAddr, ugi, conf,
-        NetUtils.getSocketFactory(conf, ClientProtocol.class));
+      Configuration conf) throws IOException {   
+    return createNamenode(nameNodeAddr, conf,
+        UserGroupInformation.getCurrentUser());
   }
 
   /** Create a {@link NameNode} proxy */
-  public static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
-    throws IOException {
-    RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
-        5, HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
-    
-    Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap =
-      new HashMap<Class<? extends Exception>, RetryPolicy>();
-    remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy);
-
-    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
-      new HashMap<Class<? extends Exception>, RetryPolicy>();
-    exceptionToPolicyMap.put(RemoteException.class, 
-        RetryPolicies.retryByRemoteException(
-            RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
-    RetryPolicy methodPolicy = RetryPolicies.retryByException(
-        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
-    Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
-    
-    methodNameToPolicyMap.put("create", methodPolicy);
-
-    return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
-        rpcNamenode, methodNameToPolicyMap);
+  public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
+      Configuration conf, UserGroupInformation ugi) throws IOException {
+    /** 
+     * Currently we have simply burnt-in support for a SINGLE
+     * protocol - protocolR23Compatible. This will be replaced
+     * by a way to pick the right protocol based on the 
+     * version of the target server.  
+     */
+    return new org.apache.hadoop.hdfs.protocolR23Compatible.
+        ClientNamenodeProtocolTranslatorR23(nameNodeAddr, conf, ugi);
   }
 
   /** Create a {@link ClientDatanodeProtocol} proxy */
   public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
       DatanodeID datanodeid, Configuration conf, int socketTimeout,
-      LocatedBlock locatedBlock)
-      throws IOException {
-    InetSocketAddress addr = NetUtils.createSocketAddr(
-      datanodeid.getHost() + ":" + datanodeid.getIpcPort());
-    if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
-      ClientDatanodeProtocol.LOG.debug("ClientDatanodeProtocol addr=" + addr);
-    }
-    
-    // Since we're creating a new UserGroupInformation here, we know that no
-    // future RPC proxies will be able to re-use the same connection. And
-    // usages of this proxy tend to be one-off calls.
-    //
-    // This is a temporary fix: callers should really achieve this by using
-    // RPC.stopProxy() on the resulting object, but this is currently not
-    // working in trunk. See the discussion on HDFS-1965.
-    Configuration confWithNoIpcIdle = new Configuration(conf);
-    confWithNoIpcIdle.setInt(CommonConfigurationKeysPublic
-        .IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0);
-
-    UserGroupInformation ticket = UserGroupInformation
-        .createRemoteUser(locatedBlock.getBlock().getLocalBlock().toString());
-    ticket.addToken(locatedBlock.getBlockToken());
-    return RPC.getProxy(ClientDatanodeProtocol.class,
-        ClientDatanodeProtocol.versionID, addr, ticket, confWithNoIpcIdle,
-        NetUtils.getDefaultSocketFactory(conf), socketTimeout);
+      LocatedBlock locatedBlock) throws IOException {
+    return new org.apache.hadoop.hdfs.protocolR23Compatible.
+        ClientDatanodeProtocolTranslatorR23(datanodeid, conf, socketTimeout,
+             locatedBlock);
   }
 
   /**
@@ -776,6 +723,14 @@ public class DFSUtil {
     return collection != null && collection.size() != 0;
   }
   
+  /** Create a {@link ClientDatanodeProtocol} proxy */
+  public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
+      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+      SocketFactory factory) throws IOException {
+    return new org.apache.hadoop.hdfs.protocolR23Compatible.
+        ClientDatanodeProtocolTranslatorR23(addr, ticket, conf, factory);
+  }
+  
   /**
    * Get nameservice Id for the {@link NameNode} based on namenode RPC address
    * matching the local node address.
@@ -919,4 +874,14 @@ public class DFSUtil {
   private interface AddressMatcher {
     public boolean match(InetSocketAddress s);
   }
+
+  /** Create a URI from the scheme and address */
+  public static URI createUri(String scheme, InetSocketAddress address) {
+    try {
+      return new URI(scheme, null, address.getHostName(), address.getPort(),
+          null, null, null);
+    } catch (URISyntaxException ue) {
+      throw new IllegalArgumentException(ue);
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Wed Nov  2 05:34:31 2011
@@ -810,7 +810,6 @@ public class DistributedFileSystem exten
   ) throws IOException {
     Token<DelegationTokenIdentifier> result =
       dfs.getDelegationToken(renewer == null ? null : new Text(renewer));
-    result.setService(new Text(getCanonicalServiceName()));
     return result;
   }
 
@@ -830,7 +829,7 @@ public class DistributedFileSystem exten
   @Deprecated
   public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
       throws IOException {
-    return dfs.getDelegationToken(renewer);
+    return getDelegationToken(renewer.toString());
   }
   
   @Override // FileSystem
@@ -847,10 +846,15 @@ public class DistributedFileSystem exten
    * @param token delegation token obtained earlier
    * @return the new expiration time
    * @throws IOException
+   * @deprecated Use Token.renew instead.
    */
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
       throws InvalidToken, IOException {
-    return dfs.renewDelegationToken(token);
+    try {
+      return token.renew(getConf());
+    } catch (InterruptedException ie) {
+      throw new RuntimeException("Caught interrupted", ie);
+    }
   }
 
   /**
@@ -858,10 +862,15 @@ public class DistributedFileSystem exten
    * 
    * @param token delegation token
    * @throws IOException
+   * @deprecated Use Token.cancel instead.
    */
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException {
-    dfs.cancelDelegationToken(token);
+    try {
+      token.cancel(getConf());
+    } catch (InterruptedException ie) {
+      throw new RuntimeException("Caught interrupted", ie);
+    }
   }
 
   /**

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java Wed Nov  2 05:34:31 2011
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -43,12 +44,15 @@ public class HDFSPolicyProvider extends 
     new Service("security.inter.datanode.protocol.acl", 
                 InterDatanodeProtocol.class),
     new Service("security.namenode.protocol.acl", NamenodeProtocol.class),
-    new Service("security.refresh.policy.protocol.acl", 
-                RefreshAuthorizationPolicyProtocol.class),
-    new Service("security.refresh.user.mappings.protocol.acl", 
-                RefreshUserMappingsProtocol.class),
-    new Service("security.get.user.mappings.protocol.acl",
-                GetUserMappingsProtocol.class)
+    new Service(
+        CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_POLICY, 
+        RefreshAuthorizationPolicyProtocol.class),
+    new Service(
+        CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_USER_MAPPINGS, 
+        RefreshUserMappingsProtocol.class),
+    new Service(
+        CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_GET_USER_MAPPINGS,
+        GetUserMappingsProtocol.class)
   };
   
   @Override

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java Wed Nov  2 05:34:31 2011
@@ -21,7 +21,6 @@ package org.apache.hadoop.hdfs;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.lang.ref.WeakReference;
 import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -32,9 +31,6 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.TimeZone;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -49,6 +45,8 @@ import org.apache.hadoop.fs.MD5MD5CRC32F
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenRenewer;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
@@ -60,6 +58,7 @@ import org.apache.hadoop.security.Securi
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ServletUtil;
 import org.xml.sax.Attributes;
@@ -78,20 +77,28 @@ import org.xml.sax.helpers.XMLReaderFact
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class HftpFileSystem extends FileSystem {
+public class HftpFileSystem extends FileSystem
+    implements DelegationTokenRenewer.Renewable {
+  private static final DelegationTokenRenewer<HftpFileSystem> dtRenewer
+      = new DelegationTokenRenewer<HftpFileSystem>(HftpFileSystem.class);
+  
   static {
     HttpURLConnection.setFollowRedirects(true);
+    dtRenewer.start();
   }
 
+  public static final Text TOKEN_KIND = new Text("HFTP delegation");
+
   private String nnHttpUrl;
-  private URI hdfsURI;
+  private Text hdfsServiceName;
+  private URI hftpURI;
   protected InetSocketAddress nnAddr;
   protected UserGroupInformation ugi; 
 
   public static final String HFTP_TIMEZONE = "UTC";
   public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
-  private Token<DelegationTokenIdentifier> delegationToken;
-  public static final String HFTP_SERVICE_NAME_KEY = "hdfs.service.host_";
+  private Token<?> delegationToken;
+  private Token<?> renewToken;
   
   public static final SimpleDateFormat getDateFormat() {
     final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
@@ -106,19 +113,23 @@ public class HftpFileSystem extends File
     }
   };
 
-  private static RenewerThread renewer = new RenewerThread();
-  static {
-    renewer.start();
-  }
-
   @Override
   protected int getDefaultPort() {
-    return DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT;
+    return getDefaultSecurePort();
+
+    //TODO: un-comment the following once HDFS-7510 is committed. 
+//    return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
+//        DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
+  }
+
+  protected int getDefaultSecurePort() {
+    return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY,
+        DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT);
   }
 
   @Override
   public String getCanonicalServiceName() {
-    return SecurityUtil.buildDTServiceName(hdfsURI, getDefaultPort());
+    return SecurityUtil.buildDTServiceName(hftpURI, getDefaultPort());
   }
   
   private String buildUri(String schema, String host, int port) {
@@ -127,7 +138,6 @@ public class HftpFileSystem extends File
   }
 
 
-  @SuppressWarnings("unchecked")
   @Override
   public void initialize(final URI name, final Configuration conf)
   throws IOException {
@@ -144,17 +154,21 @@ public class HftpFileSystem extends File
       urlPort = conf.getInt(DFSConfigKeys.DFS_HTTPS_PORT_KEY, 
           DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT);
 
-    nnHttpUrl = 
-      buildUri("https://", NetUtils.normalizeHostName(name.getHost()), urlPort);
+    String normalizedNN = NetUtils.normalizeHostName(name.getHost());
+    nnHttpUrl = buildUri("https://", normalizedNN ,urlPort);
     LOG.debug("using url to get DT:" + nnHttpUrl);
+    try {
+      hftpURI = new URI(buildUri("hftp://", normalizedNN, urlPort));
+    } catch (URISyntaxException ue) {
+      throw new IOException("bad uri for hdfs", ue);
+    }
 
-    
-    
     // if one uses RPC port different from the Default one,  
     // one should specify what is the setvice name for this delegation token
     // otherwise it is hostname:RPC_PORT
-    String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+
-    SecurityUtil.buildDTServiceName(name, DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT);
+    String key = DelegationTokenSelector.SERVICE_NAME_KEY
+        + SecurityUtil.buildDTServiceName(name,
+            DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT);
     if(LOG.isDebugEnabled()) {
       LOG.debug("Trying to find DT for " + name + " using key=" + key + 
           "; conf=" + conf.get(key, ""));
@@ -165,9 +179,10 @@ public class HftpFileSystem extends File
       nnPort = NetUtils.createSocketAddr(nnServiceName, 
           NameNode.DEFAULT_PORT).getPort();
     }
-
     try {
-      hdfsURI = new URI(buildUri("hdfs://", nnAddr.getHostName(), nnPort));
+      URI hdfsURI = new URI("hdfs://" + normalizedNN + ":" + nnPort);
+      hdfsServiceName = new Text(SecurityUtil.buildDTServiceName(hdfsURI, 
+                                                                 nnPort));
     } catch (URISyntaxException ue) {
       throw new IOException("bad uri for hdfs", ue);
     }
@@ -175,30 +190,73 @@ public class HftpFileSystem extends File
     if (UserGroupInformation.isSecurityEnabled()) {
       //try finding a token for this namenode (esp applicable for tasks
       //using hftp). If there exists one, just set the delegationField
-      String canonicalName = getCanonicalServiceName();
+      String hftpServiceName = getCanonicalServiceName();
       for (Token<? extends TokenIdentifier> t : ugi.getTokens()) {
-        if (DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(t.getKind()) &&
-            t.getService().toString().equals(canonicalName)) {
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("Found existing DT for " + name);
+        Text kind = t.getKind();
+        if (DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind)) {
+          if (t.getService().equals(hdfsServiceName)) {
+            setDelegationToken(t);
+            break;
+          }
+        } else if (TOKEN_KIND.equals(kind)) {
+          if (hftpServiceName
+              .equals(normalizeService(t.getService().toString()))) {
+            setDelegationToken(t);
+            break;
           }
-          delegationToken = (Token<DelegationTokenIdentifier>) t;
-          break;
         }
       }
       
       //since we don't already have a token, go get one over https
       if (delegationToken == null) {
-        delegationToken = 
-          (Token<DelegationTokenIdentifier>) getDelegationToken(null);
-        renewer.addTokenToRenew(this);
+        setDelegationToken(getDelegationToken(null));
+        dtRenewer.addRenewAction(this);
       }
     }
   }
+
+  private String normalizeService(String service) {
+    int colonIndex = service.indexOf(':');
+    if (colonIndex == -1) {
+      throw new IllegalArgumentException("Invalid service for hftp token: " + 
+                                         service);
+    }
+    String hostname = 
+        NetUtils.normalizeHostName(service.substring(0, colonIndex));
+    String port = service.substring(colonIndex + 1);
+    return hostname + ":" + port;
+  }
+
+  //TODO: un-comment the following once HDFS-7510 is committed. 
+//  protected Token<DelegationTokenIdentifier> selectHftpDelegationToken() {
+//    Text serviceName = SecurityUtil.buildTokenService(nnSecureAddr);
+//    return hftpTokenSelector.selectToken(serviceName, ugi.getTokens());      
+//  }
+  
+  protected Token<DelegationTokenIdentifier> selectHdfsDelegationToken() {
+    return  DelegationTokenSelector.selectHdfsDelegationToken(
+        nnAddr, ugi, getConf());
+  }
   
 
   @Override
-  public synchronized Token<?> getDelegationToken(final String renewer) throws IOException {
+  public Token<?> getRenewToken() {
+    return renewToken;
+  }
+
+  @Override
+  public <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
+    renewToken = token;
+    // emulate the 203 usage of the tokens
+    // by setting the kind and service as if they were hdfs tokens
+    delegationToken = new Token<T>(token);
+    delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
+    delegationToken.setService(hdfsServiceName);
+  }
+
+  @Override
+  public synchronized Token<?> getDelegationToken(final String renewer
+                                                  ) throws IOException {
     try {
       //Renew TGT if needed
       ugi.reloginFromKeytab();
@@ -221,7 +279,6 @@ public class HftpFileSystem extends File
               LOG.debug("Got dt for " + getUri() + ";t.service="
                   +t.getService());
             }
-            t.setService(new Text(getCanonicalServiceName()));
             return t;
           }
           return null;
@@ -594,157 +651,43 @@ public class HftpFileSystem extends File
     return cs != null? cs: super.getContentSummary(f);
   }
 
+  @InterfaceAudience.Private
+  public static class TokenManager extends TokenRenewer {
 
-  /**
-   * An action that will renew and replace the hftp file system's delegation 
-   * tokens automatically.
-   */
-  private static class RenewAction implements Delayed {
-    // when should the renew happen
-    private long timestamp;
-    // a weak reference to the file system so that it can be garbage collected
-    private final WeakReference<HftpFileSystem> weakFs;
-
-    RenewAction(long timestamp, HftpFileSystem fs) {
-      this.timestamp = timestamp;
-      this.weakFs = new WeakReference<HftpFileSystem>(fs);
-    }
-
-    /**
-     * Get the delay until this event should happen.
-     */
     @Override
-    public long getDelay(TimeUnit unit) {
-      long millisLeft = timestamp - System.currentTimeMillis();
-      return unit.convert(millisLeft, TimeUnit.MILLISECONDS);
+    public boolean handleKind(Text kind) {
+      return kind.equals(TOKEN_KIND);
     }
 
-    /**
-     * Compare two events in the same queue.
-     */
-    @Override
-    public int compareTo(Delayed o) {
-      if (o.getClass() != RenewAction.class) {
-        throw new IllegalArgumentException("Illegal comparision to non-RenewAction");
-      }
-      RenewAction other = (RenewAction) o;
-      return timestamp < other.timestamp ? -1 :
-        (timestamp == other.timestamp ? 0 : 1);
-    }
-    
     @Override
-    public int hashCode() {
-      assert false : "hashCode not designed";
-    return 33;  
-    }
-    /**
-     * equals
-     */
-    @Override
-    public boolean equals(Object o) {
-      if(!( o instanceof Delayed))
-        return false;
-      
-      return compareTo((Delayed) o) == 0;
+    public boolean isManaged(Token<?> token) throws IOException {
+      return true;
     }
 
-    /**
-     * Set a new time for the renewal. Can only be called when the action
-     * is not in the queue.
-     * @param newTime the new time
-     */
-    public void setNewTime(long newTime) {
-      timestamp = newTime;
-    }
-
-    /**
-     * Renew or replace the delegation token for this file system.
-     * @return
-     * @throws IOException
-     */
     @SuppressWarnings("unchecked")
-    public boolean renew() throws IOException, InterruptedException {
-      final HftpFileSystem fs = weakFs.get();
-      if (fs != null) {
-        synchronized (fs) {
-          fs.ugi.reloginFromKeytab();
-          fs.ugi.doAs(new PrivilegedExceptionAction<Void>() {
-
-            @Override
-            public Void run() throws Exception {
-              try {
-                DelegationTokenFetcher.renewDelegationToken(fs.nnHttpUrl, 
-                    fs.delegationToken);
-              } catch (IOException ie) {
-                try {
-                  fs.delegationToken = 
-                    (Token<DelegationTokenIdentifier>) fs.getDelegationToken(null);
-                } catch (IOException ie2) {
-                  throw new IOException("Can't renew or get new delegation token ", 
-                      ie);
-                }
-              }
-              return null;
-            } 
-          });
-        }
-      }
-      return fs != null;
-    }
-
-    public String toString() {
-      StringBuilder result = new StringBuilder();
-      HftpFileSystem fs = weakFs.get();
-      if (fs == null) {
-        return "evaporated token renew";
-      }
-      synchronized (fs) {
-        result.append(fs.delegationToken);
-      }
-      result.append(" renew in ");
-      result.append(getDelay(TimeUnit.SECONDS));
-      result.append(" secs");
-      return result.toString();
+    @Override
+    public long renew(Token<?> token, 
+                      Configuration conf) throws IOException {
+      // update the kerberos credentials, if they are coming from a keytab
+      UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+      // use https to renew the token
+      return 
+        DelegationTokenFetcher.renewDelegationToken
+        ("https://" + token.getService().toString(), 
+         (Token<DelegationTokenIdentifier>) token);
     }
-  }
 
-  /**
-   * A daemon thread that waits for the next file system to renew.
-   */
-  private static class RenewerThread extends Thread {
-    private DelayQueue<RenewAction> queue = new DelayQueue<RenewAction>();
-    // wait for 95% of a day between renewals
-    private static final int RENEW_CYCLE = (int) (0.95 * 24 * 60 * 60 * 1000);
-
-    public RenewerThread() {
-      super("HFTP Delegation Token Renewer");
-      setDaemon(true);
-    }
-
-    public void addTokenToRenew(HftpFileSystem fs) {
-      queue.add(new RenewAction(RENEW_CYCLE + System.currentTimeMillis(),fs));
-    }
-
-    public void run() {
-      RenewAction action = null;
-      while (true) {
-        try {
-          action = queue.take();
-          if (action.renew()) {
-            action.setNewTime(RENEW_CYCLE + System.currentTimeMillis());
-            queue.add(action);
-          }
-          action = null;
-        } catch (InterruptedException ie) {
-          return;
-        } catch (Exception ie) {
-          if (action != null) {
-            LOG.warn("Failure to renew token " + action, ie);
-          } else {
-            LOG.warn("Failure in renew queue", ie);
-          }
-        }
-      }
+    @SuppressWarnings("unchecked")
+    @Override
+    public void cancel(Token<?> token, 
+                       Configuration conf) throws IOException {
+      // update the kerberos credentials, if they are coming from a keytab
+      UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+      // use https to cancel the token
+      DelegationTokenFetcher.cancelDelegationToken
+        ("https://" + token.getService().toString(), 
+         (Token<DelegationTokenIdentifier>) token);
     }
+    
   }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Wed Nov  2 05:34:31 2011
@@ -33,10 +33,13 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@@ -408,11 +411,14 @@ public class RemoteBlockReader extends F
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
         vintPrefixed(in));
     checkSuccess(status, sock, block, file);
-    DataChecksum checksum = DataChecksum.newDataChecksum( in );
+    ReadOpChecksumInfoProto checksumInfo =
+      status.getReadOpChecksumInfo();
+    DataChecksum checksum = DataTransferProtoUtil.fromProto(
+        checksumInfo.getChecksum());
     //Warning when we get CHECKSUM_NULL?
     
     // Read the first chunk offset.
-    long firstChunkOffset = in.readLong();
+    long firstChunkOffset = checksumInfo.getChunkOffset();
     
     if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
         firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Wed Nov  2 05:34:31 2011
@@ -37,10 +37,29 @@ import org.apache.hadoop.security.token.
     serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY)
 @TokenInfo(BlockTokenSelector.class)
 public interface ClientDatanodeProtocol extends VersionedProtocol {
-  public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
-
   /**
+   * Until version 9, this class ClientDatanodeProtocol served as both
+   * the client interface to the DN AND the RPC protocol used to 
+   * communicate with the NN.
+   * 
+   * Post version 10 (release 23 of Hadoop), the protocol is implemented in
+   * {@literal ../protocolR23Compatible/ClientDatanodeWireProtocol}
+   * 
+   * This class is used by both the DFSClient and the 
+   * DN server side to insulate from the protocol serialization.
+   * 
+   * If you are adding/changing DN's interface then you need to 
+   * change both this class and ALSO
+   * {@link org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol}.
+   * These changes need to be done in a compatible fashion as described in 
+   * {@link org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol}
+   * 
+   * The log of historical changes can be retrieved from the svn).
    * 9: Added deleteBlockPool method
+   * 
+   * 9 is the last version id when this class was used for protocols
+   *  serialization. DO not update this version any further. 
+   *  Changes are recorded in R23 classes.
    */
   public static final long versionID = 9L;
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Wed Nov  2 05:34:31 2011
@@ -65,10 +65,28 @@ import org.apache.hadoop.hdfs.security.t
 public interface ClientProtocol extends VersionedProtocol {
 
   /**
-   * Compared to the previous version the following changes have been introduced:
-   * (Only the latest change is reflected.
+   * Until version 69, this class ClientProtocol served as both
+   * the client interface to the NN AND the RPC protocol used to 
+   * communicate with the NN.
+   * 
+   * Post version 70 (release 23 of Hadoop), the protocol is implemented in
+   * {@literal ../protocolR23Compatible/ClientNamenodeWireProtocol}
+   * 
+   * This class is used by both the DFSClient and the 
+   * NN server side to insulate from the protocol serialization.
+   * 
+   * If you are adding/changing NN's interface then you need to 
+   * change both this class and ALSO
+   * {@link org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol}.
+   * These changes need to be done in a compatible fashion as described in 
+   * {@link org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol}
+   * 
    * The log of historical changes can be retrieved from the svn).
    * 69: Eliminate overloaded method names.
+   * 
+   * 69L is the last version id when this class was used for protocols
+   *  serialization. DO not update this version any further. 
+   *  Changes are recorded in R23 classes.
    */
   public static final long versionID = 69L;
   
@@ -373,11 +391,8 @@ public interface ClientProtocol extends 
    * @return true if successful, or false if the old name does not exist
    * or if the new name already belongs to the namespace.
    * 
-   * @throws IOException an I/O error occurred
-   * 
-   * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead.
+   * @throws IOException an I/O error occurred 
    */
-  @Deprecated
   public boolean rename(String src, String dst) 
       throws UnresolvedLinkException, IOException;
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java Wed Nov  2 05:34:31 2011
@@ -24,7 +24,6 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.io.WritableComparable;
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Wed Nov  2 05:34:31 2011
@@ -75,6 +75,13 @@ public class DatanodeInfo extends Datano
     public String toString() {
       return value;
     }
+    
+    public static AdminStates fromValue(final String value) {
+      for (AdminStates as : AdminStates.values()) {
+        if (as.value.equals(value)) return as;
+      }
+      return NORMAL;
+    }
   }
 
   @Nullable
@@ -110,11 +117,20 @@ public class DatanodeInfo extends Datano
     this.adminState = null;    
   }
   
-  protected DatanodeInfo(DatanodeID nodeID, String location, String hostName) {
+  public DatanodeInfo(DatanodeID nodeID, String location, String hostName) {
     this(nodeID);
     this.location = location;
     this.hostName = hostName;
   }
+  
+  public DatanodeInfo(DatanodeID nodeID, String location, String hostName,
+      final long capacity, final long dfsUsed, final long remaining,
+      final long blockPoolUsed, final long lastUpdate, final int xceiverCount,
+      final AdminStates adminState) {
+    this(nodeID.getName(), nodeID.getStorageID(), nodeID.getInfoPort(), nodeID
+        .getIpcPort(), capacity, dfsUsed, remaining, blockPoolUsed, lastUpdate,
+        xceiverCount, location, hostName, adminState);
+  }
 
   /** Constructor */
   public DatanodeInfo(final String name, final String storageID,

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java Wed Nov  2 05:34:31 2011
@@ -26,11 +26,20 @@ import org.apache.hadoop.hdfs.HdfsConfig
  * 
  ************************************/
 @InterfaceAudience.Private
-public final class HdfsConstants {
+public class HdfsConstants {
   /* Hidden constructor */
-  private HdfsConstants() {
+  protected HdfsConstants() {
   }
-
+  
+  /**
+   * HDFS Protocol Names:  
+   */
+  public static final String CLIENT_NAMENODE_PROTOCOL_NAME = 
+      "org.apache.hadoop.hdfs.protocol.ClientProtocol";
+  public static final String CLIENT_DATANODE_PROTOCOL_NAME = 
+      "org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol";
+  
+  
   public static int MIN_BLOCKS_FOR_WRITE = 5;
 
   // Long that indicates "leave current quota unchanged"
@@ -63,7 +72,7 @@ public final class HdfsConstants {
   public static final int BYTES_IN_INTEGER = Integer.SIZE / Byte.SIZE;
 
   // SafeMode actions
-  public enum SafeModeAction {
+  public static enum SafeModeAction {
     SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET;
   }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java Wed Nov  2 05:34:31 2011
@@ -241,6 +241,10 @@ public class HdfsFileStatus implements W
   final public String getSymlink() {
     return DFSUtil.bytes2String(symlink);
   }
+  
+  final public byte[] getSymlinkInBytes() {
+    return symlink;
+  }
 
   //////////////////////////////////////////////////
   // Writable

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java Wed Nov  2 05:34:31 2011
@@ -87,6 +87,7 @@ public abstract class HdfsProtoUtil {
       .setName(dni.getName())
       .setStorageID(dni.getStorageID())
       .setInfoPort(dni.getInfoPort())
+      .setIpcPort(dni.getIpcPort())
       .build();
   }
   
@@ -95,7 +96,7 @@ public abstract class HdfsProtoUtil {
         idProto.getName(),
         idProto.getStorageID(),
         idProto.getInfoPort(),
-        -1); // ipc port not serialized in writables either
+        idProto.getIpcPort());
   }
   
   //// DatanodeInfo ////

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Wed Nov  2 05:34:31 2011
@@ -54,6 +54,11 @@ public class LocatedBlock implements Wri
   public LocatedBlock() {
     this(new ExtendedBlock(), new DatanodeInfo[0], 0L, false);
   }
+  
+
+  public LocatedBlock(ExtendedBlock eb) {
+    this(eb, new DatanodeInfo[0], 0L, false);
+  }
 
   public LocatedBlock(String bpid, Block b, DatanodeInfo[] locs) {
     this(new ExtendedBlock(bpid, b), locs, -1, false); // startOffset is unknown

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java Wed Nov  2 05:34:31 2011
@@ -23,10 +23,16 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto.ChecksumType;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
 
 
 /**
@@ -35,8 +41,20 @@ import org.apache.hadoop.security.token.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-abstract class DataTransferProtoUtil {
+public abstract class DataTransferProtoUtil {
+
+  /**
+   * Map between the internal DataChecksum identifiers and the protobuf-
+   * generated identifiers on the wire.
+   */
+  static BiMap<Integer, ChecksumProto.ChecksumType> checksumTypeMap =
+    ImmutableBiMap.<Integer, ChecksumProto.ChecksumType>builder()
+      .put(DataChecksum.CHECKSUM_CRC32, ChecksumProto.ChecksumType.CRC32)
+      .put(DataChecksum.CHECKSUM_CRC32C, ChecksumProto.ChecksumType.CRC32C)
+      .put(DataChecksum.CHECKSUM_NULL, ChecksumProto.ChecksumType.NULL)
+      .build();
 
+  
   static BlockConstructionStage fromProto(
       OpWriteBlockProto.BlockConstructionStage stage) {
     return BlockConstructionStage.valueOf(BlockConstructionStage.class,
@@ -49,6 +67,28 @@ abstract class DataTransferProtoUtil {
         stage.name());
   }
 
+  public static ChecksumProto toProto(DataChecksum checksum) {
+    ChecksumType type = checksumTypeMap.get(checksum.getChecksumType());
+    if (type == null) {
+      throw new IllegalArgumentException(
+          "Can't convert checksum to protobuf: " + checksum);
+    }
+
+    return ChecksumProto.newBuilder()
+      .setBytesPerChecksum(checksum.getBytesPerChecksum())
+      .setType(type)
+      .build();
+  }
+
+  public static DataChecksum fromProto(ChecksumProto proto) {
+    if (proto == null) return null;
+
+    int bytesPerChecksum = proto.getBytesPerChecksum();
+    int type = checksumTypeMap.inverse().get(proto.getType());
+    
+    return DataChecksum.newDataChecksum(type, bytesPerChecksum);
+  }
+
   static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk,
       String client, Token<BlockTokenIdentifier> blockToken) {
     ClientOperationHeaderProto header =

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Wed Nov  2 05:34:31 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
 
 /**
  * Transfer data to/from datanode using a streaming protocol.
@@ -84,7 +85,8 @@ public interface DataTransferProtocol {
       final int pipelineSize,
       final long minBytesRcvd,
       final long maxBytesRcvd,
-      final long latestGenerationStamp) throws IOException;
+      final long latestGenerationStamp,
+      final DataChecksum requestedChecksum) throws IOException;
 
   /**
    * Transfer a block to another datanode.

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Wed Nov  2 05:34:31 2011
@@ -103,7 +103,8 @@ public abstract class Receiver implement
         fromProto(proto.getStage()),
         proto.getPipelineSize(),
         proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
-        proto.getLatestGenerationStamp());
+        proto.getLatestGenerationStamp(),
+        fromProto(proto.getRequestedChecksum()));
   }
 
   /** Receive {@link Op#TRANSFER_BLOCK} */

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Wed Nov  2 05:34:31 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
 
 import com.google.protobuf.Message;
 
@@ -93,10 +95,14 @@ public class Sender implements DataTrans
       final int pipelineSize,
       final long minBytesRcvd,
       final long maxBytesRcvd,
-      final long latestGenerationStamp) throws IOException {
+      final long latestGenerationStamp,
+      DataChecksum requestedChecksum) throws IOException {
     ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
         blk, clientName, blockToken);
     
+    ChecksumProto checksumProto =
+      DataTransferProtoUtil.toProto(requestedChecksum);
+
     OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
       .setHeader(header)
       .addAllTargets(toProtos(targets, 1))
@@ -104,7 +110,8 @@ public class Sender implements DataTrans
       .setPipelineSize(pipelineSize)
       .setMinBytesRcvd(minBytesRcvd)
       .setMaxBytesRcvd(maxBytesRcvd)
-      .setLatestGenerationStamp(latestGenerationStamp);
+      .setLatestGenerationStamp(latestGenerationStamp)
+      .setRequestedChecksum(checksumProto);
     
     if (source != null) {
       proto.setSource(toProto(source));



Mime
View raw message