hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1299412 [2/11] - in /hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/bin/ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/...
Date Sun, 11 Mar 2012 17:56:07 GMT
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Sun Mar 11 17:55:58 2012
@@ -18,26 +18,22 @@
 
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICE_ID;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
-
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.SecureRandom;
-import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.concurrent.TimeUnit;
 
@@ -50,7 +46,6 @@ import org.apache.hadoop.fs.BlockLocatio
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -68,11 +63,19 @@ import org.apache.hadoop.ipc.RpcPayloadH
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.protobuf.BlockingService;
 
 @InterfaceAudience.Private
 public class DFSUtil {
+  public static final Log LOG = LogFactory.getLog(DFSUtil.class.getName());
+  
+  private DFSUtil() { /* Hidden constructor */ }
   private static final ThreadLocal<Random> RANDOM = new ThreadLocal<Random>() {
     @Override
     protected Random initialValue() {
@@ -110,13 +113,20 @@ public class DFSUtil {
           a.isDecommissioned() ? 1 : -1;
       }
     };
+  /**
+   * Address matcher for matching an address to local address
+   */
+  static final AddressMatcher LOCAL_ADDRESS_MATCHER = new AddressMatcher() {
+    public boolean match(InetSocketAddress s) {
+      return NetUtils.isLocalAddress(s.getAddress());
+    };
+  };
   
   /**
    * Whether the pathname is valid.  Currently prohibits relative paths, 
    * and names which contain a ":" or "/" 
    */
   public static boolean isValidName(String src) {
-      
     // Path must be absolute.
     if (!src.startsWith(Path.SEPARATOR)) {
       return false;
@@ -313,13 +323,39 @@ public class DFSUtil {
   /**
    * Returns collection of nameservice Ids from the configuration.
    * @param conf configuration
-   * @return collection of nameservice Ids
+   * @return collection of nameservice Ids, or null if not specified
    */
   public static Collection<String> getNameServiceIds(Configuration conf) {
-    return conf.getStringCollection(DFS_FEDERATION_NAMESERVICES);
+    return conf.getTrimmedStringCollection(DFS_FEDERATION_NAMESERVICES);
   }
 
   /**
+   * @return <code>coll</code> if it is non-null and non-empty. Otherwise,
+   * returns a list with a single null value.
+   */
+  private static Collection<String> emptyAsSingletonNull(Collection<String> coll) {
+    if (coll == null || coll.isEmpty()) {
+      return Collections.singletonList(null);
+    } else {
+      return coll;
+    }
+  }
+  
+  /**
+   * Namenode HighAvailability related configuration.
+   * Returns collection of namenode Ids from the configuration. One logical id
+   * for each namenode in the in the HA setup.
+   * 
+   * @param conf configuration
+   * @param nsId the nameservice ID to look at, or null for non-federated 
+   * @return collection of namenode Ids
+   */
+  public static Collection<String> getNameNodeIds(Configuration conf, String nsId) {
+    String key = addSuffix(DFS_HA_NAMENODES_KEY_PREFIX, nsId);
+    return conf.getTrimmedStringCollection(key);
+  }
+  
+  /**
    * Given a list of keys in the order of preference, returns a value
    * for the key in the given order from the configuration.
    * @param defaultValue default value to return, when key was not found
@@ -332,9 +368,7 @@ public class DFSUtil {
       Configuration conf, String... keys) {
     String value = null;
     for (String key : keys) {
-      if (keySuffix != null) {
-        key += "." + keySuffix;
-      }
+      key = addSuffix(key, keySuffix);
       value = conf.get(key);
       if (value != null) {
         break;
@@ -346,36 +380,84 @@ public class DFSUtil {
     return value;
   }
   
+  /** Add non empty and non null suffix to a key */
+  private static String addSuffix(String key, String suffix) {
+    if (suffix == null || suffix.isEmpty()) {
+      return key;
+    }
+    assert !suffix.startsWith(".") :
+      "suffix '" + suffix + "' should not already have '.' prepended.";
+    return key + "." + suffix;
+  }
+  
+  /** Concatenate list of suffix strings '.' separated */
+  private static String concatSuffixes(String... suffixes) {
+    if (suffixes == null) {
+      return null;
+    }
+    return Joiner.on(".").skipNulls().join(suffixes);
+  }
+  
   /**
-   * Returns list of InetSocketAddress for a given set of keys.
+   * Return configuration key of format key.suffix1.suffix2...suffixN
+   */
+  public static String addKeySuffixes(String key, String... suffixes) {
+    String keySuffix = concatSuffixes(suffixes);
+    return addSuffix(key, keySuffix);
+  }
+  
+  /**
+   * Returns the configured address for all NameNodes in the cluster.
    * @param conf configuration
-   * @param defaultAddress default address to return in case key is not found
+   * @param defaultAddress default address to return in case key is not found.
    * @param keys Set of keys to look for in the order of preference
-   * @return list of InetSocketAddress corresponding to the key
+   * @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
    */
-  private static List<InetSocketAddress> getAddresses(Configuration conf,
+  private static Map<String, Map<String, InetSocketAddress>>
+    getAddresses(Configuration conf,
       String defaultAddress, String... keys) {
     Collection<String> nameserviceIds = getNameServiceIds(conf);
-    List<InetSocketAddress> isas = new ArrayList<InetSocketAddress>();
-
-    // Configuration with a single namenode
-    if (nameserviceIds == null || nameserviceIds.isEmpty()) {
-      String address = getConfValue(defaultAddress, null, conf, keys);
-      if (address == null) {
-        return null;
+    
+    // Look for configurations of the form <key>[.<nameserviceId>][.<namenodeId>]
+    // across all of the configured nameservices and namenodes.
+    Map<String, Map<String, InetSocketAddress>> ret = Maps.newHashMap();
+    for (String nsId : emptyAsSingletonNull(nameserviceIds)) {
+      Map<String, InetSocketAddress> isas =
+        getAddressesForNameserviceId(conf, nsId, defaultAddress, keys);
+      if (!isas.isEmpty()) {
+        ret.put(nsId, isas);
       }
-      isas.add(NetUtils.createSocketAddr(address));
-    } else {
-      // Get the namenodes for all the configured nameServiceIds
-      for (String nameserviceId : nameserviceIds) {
-        String address = getConfValue(null, nameserviceId, conf, keys);
-        if (address == null) {
-          return null;
-        }
-        isas.add(NetUtils.createSocketAddr(address));
+    }
+    return ret;
+  }
+
+  private static Map<String, InetSocketAddress> getAddressesForNameserviceId(
+      Configuration conf, String nsId, String defaultValue,
+      String[] keys) {
+    Collection<String> nnIds = getNameNodeIds(conf, nsId);
+    Map<String, InetSocketAddress> ret = Maps.newHashMap();
+    for (String nnId : emptyAsSingletonNull(nnIds)) {
+      String suffix = concatSuffixes(nsId, nnId);
+      String address = getConfValue(defaultValue, suffix, conf, keys);
+      if (address != null) {
+        InetSocketAddress isa = NetUtils.createSocketAddr(address);
+        ret.put(nnId, isa);
       }
     }
-    return isas;
+    return ret;
+  }
+
+  /**
+   * Returns list of InetSocketAddress corresponding to HA NN RPC addresses from
+   * the configuration.
+   * 
+   * @param conf configuration
+   * @return list of InetSocketAddresses
+   * @throws IOException if no addresses are configured
+   */
+  public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses(
+      Configuration conf) {
+    return getAddresses(conf, null, DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
   }
   
   /**
@@ -386,11 +468,11 @@ public class DFSUtil {
    * @return list of InetSocketAddresses
    * @throws IOException on error
    */
-  public static List<InetSocketAddress> getBackupNodeAddresses(
+  public static Map<String, Map<String, InetSocketAddress>> getBackupNodeAddresses(
       Configuration conf) throws IOException {
-    List<InetSocketAddress> addressList = getAddresses(conf,
+    Map<String, Map<String, InetSocketAddress>> addressList = getAddresses(conf,
         null, DFS_NAMENODE_BACKUP_ADDRESS_KEY);
-    if (addressList == null) {
+    if (addressList.isEmpty()) {
       throw new IOException("Incorrect configuration: backup node address "
           + DFS_NAMENODE_BACKUP_ADDRESS_KEY + " is not configured.");
     }
@@ -405,11 +487,11 @@ public class DFSUtil {
    * @return list of InetSocketAddresses
    * @throws IOException on error
    */
-  public static List<InetSocketAddress> getSecondaryNameNodeAddresses(
+  public static Map<String, Map<String, InetSocketAddress>> getSecondaryNameNodeAddresses(
       Configuration conf) throws IOException {
-    List<InetSocketAddress> addressList = getAddresses(conf, null,
+    Map<String, Map<String, InetSocketAddress>> addressList = getAddresses(conf, null,
         DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY);
-    if (addressList == null) {
+    if (addressList.isEmpty()) {
       throw new IOException("Incorrect configuration: secondary namenode address "
           + DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY + " is not configured.");
     }
@@ -429,7 +511,7 @@ public class DFSUtil {
    * @return list of InetSocketAddress
    * @throws IOException on error
    */
-  public static List<InetSocketAddress> getNNServiceRpcAddresses(
+  public static Map<String, Map<String, InetSocketAddress>> getNNServiceRpcAddresses(
       Configuration conf) throws IOException {
     // Use default address as fall back
     String defaultAddress;
@@ -439,9 +521,10 @@ public class DFSUtil {
       defaultAddress = null;
     }
     
-    List<InetSocketAddress> addressList = getAddresses(conf, defaultAddress,
+    Map<String, Map<String, InetSocketAddress>> addressList =
+      getAddresses(conf, defaultAddress,
         DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
-    if (addressList == null) {
+    if (addressList.isEmpty()) {
       throw new IOException("Incorrect configuration: namenode address "
           + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or "  
           + DFS_NAMENODE_RPC_ADDRESS_KEY
@@ -451,10 +534,154 @@ public class DFSUtil {
   }
   
   /**
-   * Given the InetSocketAddress for any configured communication with a 
-   * namenode, this method returns the corresponding nameservice ID,
-   * by doing a reverse lookup on the list of nameservices until it
-   * finds a match.
+   * Flatten the given map, as returned by other functions in this class,
+   * into a flat list of {@link ConfiguredNNAddress} instances.
+   */
+  public static List<ConfiguredNNAddress> flattenAddressMap(
+      Map<String, Map<String, InetSocketAddress>> map) {
+    List<ConfiguredNNAddress> ret = Lists.newArrayList();
+    
+    for (Map.Entry<String, Map<String, InetSocketAddress>> entry :
+      map.entrySet()) {
+      String nsId = entry.getKey();
+      Map<String, InetSocketAddress> nnMap = entry.getValue();
+      for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
+        String nnId = e2.getKey();
+        InetSocketAddress addr = e2.getValue();
+        
+        ret.add(new ConfiguredNNAddress(nsId, nnId, addr));
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * Format the given map, as returned by other functions in this class,
+   * into a string suitable for debugging display. The format of this string
+   * should not be considered an interface, and is liable to change.
+   */
+  public static String addressMapToString(
+      Map<String, Map<String, InetSocketAddress>> map) {
+    StringBuilder b = new StringBuilder();
+    for (Map.Entry<String, Map<String, InetSocketAddress>> entry :
+         map.entrySet()) {
+      String nsId = entry.getKey();
+      Map<String, InetSocketAddress> nnMap = entry.getValue();
+      b.append("Nameservice <").append(nsId).append(">:").append("\n");
+      for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
+        b.append("  NN ID ").append(e2.getKey())
+          .append(" => ").append(e2.getValue()).append("\n");
+      }
+    }
+    return b.toString();
+  }
+  
+  public static String nnAddressesAsString(Configuration conf) {
+    Map<String, Map<String, InetSocketAddress>> addresses =
+      getHaNnRpcAddresses(conf);
+    return addressMapToString(addresses);
+  }
+
+  /**
+   * Represent one of the NameNodes configured in the cluster.
+   */
+  public static class ConfiguredNNAddress {
+    private final String nameserviceId;
+    private final String namenodeId;
+    private final InetSocketAddress addr;
+
+    private ConfiguredNNAddress(String nameserviceId, String namenodeId,
+        InetSocketAddress addr) {
+      this.nameserviceId = nameserviceId;
+      this.namenodeId = namenodeId;
+      this.addr = addr;
+    }
+
+    public String getNameserviceId() {
+      return nameserviceId;
+    }
+
+    public String getNamenodeId() {
+      return namenodeId;
+    }
+
+    public InetSocketAddress getAddress() {
+      return addr;
+    }
+    
+    @Override
+    public String toString() {
+      return "ConfiguredNNAddress[nsId=" + nameserviceId + ";" +
+        "nnId=" + namenodeId + ";addr=" + addr + "]";
+    }
+  }
+  
+  /**
+   * Get a URI for each configured nameservice. If a nameservice is
+   * HA-enabled, then the logical URI of the nameservice is returned. If the
+   * nameservice is not HA-enabled, then a URI corresponding to an RPC address
+   * of the single NN for that nameservice is returned, preferring the service
+   * RPC address over the client RPC address.
+   * 
+   * @param conf configuration
+   * @return a collection of all configured NN URIs, preferring service
+   *         addresses
+   */
+  public static Collection<URI> getNsServiceRpcUris(Configuration conf) {
+    return getNameServiceUris(conf,
+        DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
+        DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
+  }
+
+  /**
+   * Get a URI for each configured nameservice. If a nameservice is
+   * HA-enabled, then the logical URI of the nameservice is returned. If the
+   * nameservice is not HA-enabled, then a URI corresponding to the address of
+   * the single NN for that nameservice is returned.
+   * 
+   * @param conf configuration
+   * @param keys configuration keys to try in order to get the URI for non-HA
+   *        nameservices
+   * @return a collection of all configured NN URIs
+   */
+  public static Collection<URI> getNameServiceUris(Configuration conf,
+      String... keys) {
+    Set<URI> ret = new HashSet<URI>();
+    for (String nsId : getNameServiceIds(conf)) {
+      if (HAUtil.isHAEnabled(conf, nsId)) {
+        // Add the logical URI of the nameservice.
+        try {
+          ret.add(new URI(HdfsConstants.HDFS_URI_SCHEME + "://" + nsId));
+        } catch (URISyntaxException ue) {
+          throw new IllegalArgumentException(ue);
+        }
+      } else {
+        // Add the URI corresponding to the address of the NN.
+        for (String key : keys) {
+          String addr = conf.get(concatSuffixes(key, nsId));
+          if (addr != null) {
+            ret.add(createUri(HdfsConstants.HDFS_URI_SCHEME,
+                NetUtils.createSocketAddr(addr)));
+            break;
+          }
+        }
+      }
+    }
+    // Add the generic configuration keys.
+    for (String key : keys) {
+      String addr = conf.get(key);
+      if (addr != null) {
+        ret.add(createUri("hdfs", NetUtils.createSocketAddr(addr)));
+        break;
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * Given the InetSocketAddress this method returns the nameservice Id
+   * corresponding to the key with matching address, by doing a reverse 
+   * lookup on the list of nameservices until it finds a match.
    * 
    * Since the process of resolving URIs to Addresses is slightly expensive,
    * this utility method should not be used in performance-critical routines.
@@ -472,91 +699,109 @@ public class DFSUtil {
    *     not the NameServiceId-suffixed keys.
    * @return nameserviceId, or null if no match found
    */
-  public static String getNameServiceIdFromAddress(Configuration conf, 
-      InetSocketAddress address, String... keys) {
-    Collection<String> nameserviceIds = getNameServiceIds(conf);
-
+  public static String getNameServiceIdFromAddress(final Configuration conf, 
+      final InetSocketAddress address, String... keys) {
     // Configuration with a single namenode and no nameserviceId
-    if (nameserviceIds == null || nameserviceIds.isEmpty()) {
-      return null;
-    }
-    // Get the candidateAddresses for all the configured nameServiceIds
-    for (String nameserviceId : nameserviceIds) {
-      for (String key : keys) {
-        String candidateAddress = conf.get(
-            getNameServiceIdKey(key, nameserviceId));
-        if (candidateAddress != null
-            && address.equals(NetUtils.createSocketAddr(candidateAddress)))
-          return nameserviceId;
-      }
-    }
-    // didn't find a match
-    return null;
+    String[] ids = getSuffixIDs(conf, address, keys);
+    return (ids != null) ? ids[0] : null;
   }
-
+  
   /**
-   * return server http or https address from the configuration
+   * return server http or https address from the configuration for a
+   * given namenode rpc address.
    * @param conf
-   * @param namenode - namenode address
+   * @param namenodeAddr - namenode RPC address
    * @param httpsAddress -If true, and if security is enabled, returns server 
    *                      https address. If false, returns server http address.
    * @return server http or https address
    */
   public static String getInfoServer(
-      InetSocketAddress namenode, Configuration conf, boolean httpsAddress) {
-    String httpAddress = null;
-    
-    String httpAddressKey = (UserGroupInformation.isSecurityEnabled() 
-        && httpsAddress) ? DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY
-        : DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
-    String httpAddressDefault = (UserGroupInformation.isSecurityEnabled() 
-        && httpsAddress) ? DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT
-        : DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
-    if(namenode != null) {
+      InetSocketAddress namenodeAddr, Configuration conf, boolean httpsAddress) {
+    boolean securityOn = UserGroupInformation.isSecurityEnabled();
+    String httpAddressKey = (securityOn && httpsAddress) ? 
+        DFS_NAMENODE_HTTPS_ADDRESS_KEY : DFS_NAMENODE_HTTP_ADDRESS_KEY;
+    String httpAddressDefault = (securityOn && httpsAddress) ? 
+        DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT : DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
+      
+    String suffixes[];
+    if (namenodeAddr != null) {
       // if non-default namenode, try reverse look up 
       // the nameServiceID if it is available
-      String nameServiceId = DFSUtil.getNameServiceIdFromAddress(
-          conf, namenode,
+      suffixes = getSuffixIDs(conf, namenodeAddr,
           DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
           DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
-
-      if (nameServiceId != null) {
-        httpAddress = conf.get(DFSUtil.getNameServiceIdKey(
-            httpAddressKey, nameServiceId));
-      }
-    }
-    // else - Use non-federation style configuration
-    if (httpAddress == null) {
-      httpAddress = conf.get(httpAddressKey, httpAddressDefault);
+    } else {
+      suffixes = new String[2];
     }
 
-    return httpAddress;
+    return getSuffixedConf(conf, httpAddressKey, httpAddressDefault, suffixes);
   }
   
+
   /**
-   * @return key specific to a nameserviceId from a generic key
-   */
-  public static String getNameServiceIdKey(String key, String nameserviceId) {
-    return key + "." + nameserviceId;
+   * Substitute a default host in the case that an address has been configured
+   * with a wildcard. This is used, for example, when determining the HTTP
+   * address of the NN -- if it's configured to bind to 0.0.0.0, we want to
+   * substitute the hostname from the filesystem URI rather than trying to
+   * connect to 0.0.0.0.
+   * @param configuredAddress the address found in the configuration
+   * @param defaultHost the host to substitute with, if configuredAddress
+   * is a local/wildcard address.
+   * @return the substituted address
+   * @throws IOException if it is a wildcard address and security is enabled
+   */
+  public static String substituteForWildcardAddress(String configuredAddress,
+      String defaultHost) throws IOException {
+    InetSocketAddress sockAddr = NetUtils.createSocketAddr(configuredAddress);
+    if (sockAddr.getAddress().isAnyLocalAddress()) {
+      if(UserGroupInformation.isSecurityEnabled()) {
+        throw new IOException("Cannot use a wildcard address with security. " +
+                              "Must explicitly set bind address for Kerberos");
+      }
+      return defaultHost + ":" + sockAddr.getPort();
+    } else {
+      return configuredAddress;
+    }
+  }
+  
+  private static String getSuffixedConf(Configuration conf,
+      String key, String defaultVal, String[] suffixes) {
+    String ret = conf.get(DFSUtil.addKeySuffixes(key, suffixes));
+    if (ret != null) {
+      return ret;
+    }
+    return conf.get(key, defaultVal);
   }
   
   /**
    * Sets the node specific setting into generic configuration key. Looks up
-   * value of "key.nameserviceId" and if found sets that value into generic key 
-   * in the conf. Note that this only modifies the runtime conf.
+   * value of "key.nameserviceId.namenodeId" and if found sets that value into 
+   * generic key in the conf. If this is not found, falls back to
+   * "key.nameserviceId" and then the unmodified key.
+   *
+   * Note that this only modifies the runtime conf.
    * 
    * @param conf
    *          Configuration object to lookup specific key and to set the value
    *          to the key passed. Note the conf object is modified.
    * @param nameserviceId
-   *          nameservice Id to construct the node specific key.
+   *          nameservice Id to construct the node specific key. Pass null if
+   *          federation is not configuration.
+   * @param nnId
+   *          namenode Id to construct the node specific key. Pass null if
+   *          HA is not configured.
    * @param keys
    *          The key for which node specific value is looked up
    */
   public static void setGenericConf(Configuration conf,
-      String nameserviceId, String... keys) {
+      String nameserviceId, String nnId, String... keys) {
     for (String key : keys) {
-      String value = conf.get(getNameServiceIdKey(key, nameserviceId));
+      String value = conf.get(addKeySuffixes(key, nameserviceId, nnId));
+      if (value != null) {
+        conf.set(key, value);
+        continue;
+      }
+      value = conf.get(addKeySuffixes(key, nameserviceId));
       if (value != null) {
         conf.set(key, value);
       }
@@ -581,58 +826,6 @@ public class DFSUtil {
   public static int roundBytesToGB(long bytes) {
     return Math.round((float)bytes/ 1024 / 1024 / 1024);
   }
-
-
-  /** Create a {@link NameNode} proxy */
-  public static ClientProtocol createNamenode(Configuration conf)
-      throws IOException {
-    return createNamenode(NameNode.getAddress(conf), conf);
-  }
-
-  /** Create a {@link NameNode} proxy */
-  public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
-      Configuration conf) throws IOException {   
-    return createNamenode(nameNodeAddr, conf,
-        UserGroupInformation.getCurrentUser());
-  }
-    
-  /** Create a {@link NameNode} proxy */
-  public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
-      Configuration conf, UserGroupInformation ugi) throws IOException {
-    /** 
-     * Currently we have simply burnt-in support for a SINGLE
-     * protocol - protocolPB. This will be replaced
-     * by a way to pick the right protocol based on the 
-     * version of the target server.  
-     */
-    return new org.apache.hadoop.hdfs.protocolPB.
-        ClientNamenodeProtocolTranslatorPB(nameNodeAddr, conf, ugi);
-  }
-
-  /** Create a {@link NameNode} proxy */
-  static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
-    throws IOException {
-    RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
-        5, HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
-    
-    Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap =
-      new HashMap<Class<? extends Exception>, RetryPolicy>();
-    remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy);
-
-    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
-      new HashMap<Class<? extends Exception>, RetryPolicy>();
-    exceptionToPolicyMap.put(RemoteException.class, 
-        RetryPolicies.retryByRemoteException(
-            RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
-    RetryPolicy methodPolicy = RetryPolicies.retryByException(
-        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
-    Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
-    
-    methodNameToPolicyMap.put("create", methodPolicy);
-
-    return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
-        rpcNamenode, methodNameToPolicyMap);
-  }
   
   /** Create a {@link ClientDatanodeProtocol} proxy */
   public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
@@ -656,9 +849,9 @@ public class DFSUtil {
       SocketFactory factory) throws IOException {
     return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory);
   }
-  
+
   /**
-   * Get name service Id for the {@link NameNode} based on namenode RPC address
+   * Get nameservice Id for the {@link NameNode} based on namenode RPC address
    * matching the local node address.
    */
   public static String getNamenodeNameServiceId(Configuration conf) {
@@ -666,7 +859,7 @@ public class DFSUtil {
   }
   
   /**
-   * Get name service Id for the BackupNode based on backup node RPC address
+   * Get nameservice Id for the BackupNode based on backup node RPC address
    * matching the local node address.
    */
   public static String getBackupNameServiceId(Configuration conf) {
@@ -674,7 +867,7 @@ public class DFSUtil {
   }
   
   /**
-   * Get name service Id for the secondary node based on secondary http address
+   * Get nameservice Id for the secondary node based on secondary http address
    * matching the local node address.
    */
   public static String getSecondaryNameServiceId(Configuration conf) {
@@ -686,13 +879,14 @@ public class DFSUtil {
    * the address of the local node. 
    * 
    * If {@link DFSConfigKeys#DFS_FEDERATION_NAMESERVICE_ID} is not specifically
-   * configured, this method determines the nameservice Id by matching the local
-   * nodes address with the configured addresses. When a match is found, it
-   * returns the nameservice Id from the corresponding configuration key.
+   * configured, and more than one nameservice Id is configured, this method 
+   * determines the nameservice Id by matching the local node's address with the
+   * configured addresses. When a match is found, it returns the nameservice Id
+   * from the corresponding configuration key.
    * 
    * @param conf Configuration
    * @param addressKey configuration key to get the address.
-   * @return name service Id on success, null on failure.
+   * @return nameservice Id on success, null if federation is not configured.
    * @throws HadoopIllegalArgumentException on error
    */
   private static String getNameServiceId(Configuration conf, String addressKey) {
@@ -700,34 +894,106 @@ public class DFSUtil {
     if (nameserviceId != null) {
       return nameserviceId;
     }
-    
-    Collection<String> ids = getNameServiceIds(conf);
-    if (ids == null || ids.size() == 0) {
-      // Not federation configuration, hence no nameservice Id
-      return null;
+    Collection<String> nsIds = getNameServiceIds(conf);
+    if (1 == nsIds.size()) {
+      return nsIds.toArray(new String[1])[0];
     }
+    String nnId = conf.get(DFS_HA_NAMENODE_ID_KEY);
     
-    // Match the rpc address with that of local address
+    return getSuffixIDs(conf, addressKey, null, nnId, LOCAL_ADDRESS_MATCHER)[0];
+  }
+  
+  /**
+   * Returns nameservice Id and namenode Id when the local host matches the
+   * configuration parameter {@code addressKey}.<nameservice Id>.<namenode Id>
+   * 
+   * @param conf Configuration
+   * @param addressKey configuration key corresponding to the address.
+   * @param knownNsId only look at configs for the given nameservice, if not-null
+   * @param knownNNId only look at configs for the given namenode, if not null
+   * @param matcher matching criteria for matching the address
+   * @return Array with nameservice Id and namenode Id on success. First element
+   *         in the array is nameservice Id and second element is namenode Id.
+   *         Null value indicates that the configuration does not have the the
+   *         Id.
+   * @throws HadoopIllegalArgumentException on error
+   */
+  static String[] getSuffixIDs(final Configuration conf, final String addressKey,
+      String knownNsId, String knownNNId,
+      final AddressMatcher matcher) {
+    String nameserviceId = null;
+    String namenodeId = null;
     int found = 0;
-    for (String id : ids) {
-      String addr = conf.get(getNameServiceIdKey(addressKey, id));
-      InetSocketAddress s = NetUtils.createSocketAddr(addr);
-      if (NetUtils.isLocalAddress(s.getAddress())) {
-        nameserviceId = id;
-        found++;
+    
+    Collection<String> nsIds = getNameServiceIds(conf);
+    for (String nsId : emptyAsSingletonNull(nsIds)) {
+      if (knownNsId != null && !knownNsId.equals(nsId)) {
+        continue;
+      }
+      
+      Collection<String> nnIds = getNameNodeIds(conf, nsId);
+      for (String nnId : emptyAsSingletonNull(nnIds)) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(String.format("addressKey: %s nsId: %s nnId: %s",
+              addressKey, nsId, nnId));
+        }
+        if (knownNNId != null && !knownNNId.equals(nnId)) {
+          continue;
+        }
+        String key = addKeySuffixes(addressKey, nsId, nnId);
+        String addr = conf.get(key);
+        if (addr == null) {
+          continue;
+        }
+        InetSocketAddress s = null;
+        try {
+          s = NetUtils.createSocketAddr(addr);
+        } catch (Exception e) {
+          LOG.warn("Exception in creating socket address " + addr, e);
+          continue;
+        }
+        if (!s.isUnresolved() && matcher.match(s)) {
+          nameserviceId = nsId;
+          namenodeId = nnId;
+          found++;
+        }
       }
     }
     if (found > 1) { // Only one address must match the local address
-      throw new HadoopIllegalArgumentException(
-          "Configuration has multiple RPC addresses that matches "
-              + "the local node's address. Please configure the system with "
-              + "the parameter " + DFS_FEDERATION_NAMESERVICE_ID);
-    }
-    if (found == 0) {
-      throw new HadoopIllegalArgumentException("Configuration address "
-          + addressKey + " is missing in configuration with name service Id");
+      String msg = "Configuration has multiple addresses that match "
+          + "local node's address. Please configure the system with "
+          + DFS_FEDERATION_NAMESERVICE_ID + " and "
+          + DFS_HA_NAMENODE_ID_KEY;
+      throw new HadoopIllegalArgumentException(msg);
     }
-    return nameserviceId;
+    return new String[] { nameserviceId, namenodeId };
+  }
+  
+  /**
+   * For given set of {@code keys} adds nameservice Id and or namenode Id
+   * and returns {nameserviceId, namenodeId} when address match is found.
+   * @see #getSuffixIDs(Configuration, String, AddressMatcher)
+   */
+  static String[] getSuffixIDs(final Configuration conf,
+      final InetSocketAddress address, final String... keys) {
+    AddressMatcher matcher = new AddressMatcher() {
+     @Override
+      public boolean match(InetSocketAddress s) {
+        return address.equals(s);
+      } 
+    };
+    
+    for (String key : keys) {
+      String[] ids = getSuffixIDs(conf, key, null, null, matcher);
+      if (ids != null && (ids [0] != null || ids[1] != null)) {
+        return ids;
+      }
+    }
+    return null;
+  }
+  
+  private interface AddressMatcher {
+    public boolean match(InetSocketAddress s);
   }
 
   /** Create a URI from the scheme and address */
@@ -753,4 +1019,39 @@ public class DFSUtil {
     RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class);
     server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
   }
+
+  /**
+   * Map a logical namenode ID to its service address. Use the given
+   * nameservice if specified, or the configured one if none is given.
+   *
+   * @param conf Configuration
+   * @param nsId which nameservice nnId is a part of, optional
+   * @param nnId the namenode ID to get the service addr for
+   * @return the service addr, null if it could not be determined
+   */
+  public static String getNamenodeServiceAddr(final Configuration conf,
+      String nsId, String nnId) {
+
+    if (nsId == null) {
+      Collection<String> nsIds = getNameServiceIds(conf);
+      if (1 == nsIds.size()) {
+        nsId = nsIds.toArray(new String[1])[0];
+      } else {
+        // No nameservice ID was given and more than one is configured
+        return null;
+      }
+    }
+
+    String serviceAddrKey = concatSuffixes(
+        DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nsId, nnId);
+
+    String addrKey = concatSuffixes(
+        DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nsId, nnId);
+
+    String serviceRpcAddr = conf.get(serviceAddrKey);
+    if (serviceRpcAddr == null) {
+      serviceRpcAddr = conf.get(addrKey);
+    }
+    return serviceRpcAddr;
+  }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Sun Mar 11 17:55:58 2012
@@ -106,8 +106,7 @@ public class DistributedFileSystem exten
       throw new IOException("Incomplete HDFS URI, no host: "+ uri);
     }
 
-    InetSocketAddress namenode = NameNode.getAddress(uri.getAuthority());
-    this.dfs = new DFSClient(namenode, conf, statistics);
+    this.dfs = new DFSClient(uri, conf, statistics);
     this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
     this.workingDir = getHomeDirectory();
   }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java Sun Mar 11 17:55:58 2012
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -44,6 +45,8 @@ public class HDFSPolicyProvider extends 
     new Service("security.inter.datanode.protocol.acl", 
                 InterDatanodeProtocol.class),
     new Service("security.namenode.protocol.acl", NamenodeProtocol.class),
+    new Service(CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL,
+        HAServiceProtocol.class),
     new Service(
         CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_POLICY, 
         RefreshAuthorizationPolicyProtocol.class),

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java Sun Mar 11 17:55:58 2012
@@ -40,7 +40,7 @@ import org.apache.hadoop.hdfs.server.dat
  * - followed by the invalid replica represented with three -1s;
  * - followed by the under-construction replica list where each replica is
  *   represented by 4 longs: three for the block id, length, generation 
- *   stamp, and the forth for the replica state.
+ *   stamp, and the fourth for the replica state.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -304,4 +304,16 @@ public class BlockListAsLongs implements
     blockList[idx+1] = -1;
     blockList[idx+2] = -1;
   }
+
+  public long getMaxGsInBlockList() {
+    long maxGs = -1;
+    Iterator<Block> iter = getBlockReportIterator();
+    while (iter.hasNext()) {
+      Block b = iter.next();
+      if (b.getGenerationStamp() > maxGs) {
+        maxGs = b.getGenerationStamp();
+      }
+    }
+    return maxGs;
+  }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Sun Mar 11 17:55:58 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.Idempotent;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.token.Token;
@@ -114,6 +115,7 @@ public interface ClientProtocol {
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
    * @throws IOException If an I/O error occurred
    */
+  @Idempotent
   public LocatedBlocks getBlockLocations(String src,
                                          long offset,
                                          long length) 
@@ -125,6 +127,7 @@ public interface ClientProtocol {
    * @return a set of server default configuration values
    * @throws IOException
    */
+  @Idempotent
   public FsServerDefaults getServerDefaults() throws IOException;
 
   /**
@@ -228,6 +231,7 @@ public interface ClientProtocol {
    * @throws UnresolvedLinkException if <code>src</code> contains a symlink
    * @throws IOException If an I/O error occurred
    */
+  @Idempotent
   public boolean setReplication(String src, short replication)
       throws AccessControlException, DSQuotaExceededException,
       FileNotFoundException, SafeModeException, UnresolvedLinkException,
@@ -242,6 +246,7 @@ public interface ClientProtocol {
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
    * @throws IOException If an I/O error occurred
    */
+  @Idempotent
   public void setPermission(String src, FsPermission permission)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException;
@@ -259,12 +264,13 @@ public interface ClientProtocol {
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
    * @throws IOException If an I/O error occurred
    */
+  @Idempotent
   public void setOwner(String src, String username, String groupname)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException;
 
   /**
-   * The client can give up on a blcok by calling abandonBlock().
+   * The client can give up on a block by calling abandonBlock().
    * The client can then
    * either obtain a new block, or complete or abandon the file.
    * Any partial writes to the block will be discarded.
@@ -331,6 +337,7 @@ public interface ClientProtocol {
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
    * @throws IOException If an I/O error occurred
    */
+  @Idempotent
   public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
       final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
       final int numAdditionalNodes, final String clientName
@@ -368,6 +375,7 @@ public interface ClientProtocol {
    * locations on datanodes).
    * @param blocks Array of located blocks to report
    */
+  @Idempotent
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;
 
   ///////////////////////////////////////
@@ -482,6 +490,7 @@ public interface ClientProtocol {
    * RunTimeExceptions:
    * @throws InvalidPathException If <code>src</code> is invalid
    */
+  @Idempotent
   public boolean mkdirs(String src, FsPermission masked, boolean createParent)
       throws AccessControlException, FileAlreadyExistsException,
       FileNotFoundException, NSQuotaExceededException,
@@ -502,6 +511,7 @@ public interface ClientProtocol {
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
    * @throws IOException If an I/O error occurred
    */
+  @Idempotent
   public DirectoryListing getListing(String src,
                                      byte[] startAfter,
                                      boolean needLocation)
@@ -531,6 +541,7 @@ public interface ClientProtocol {
    * @throws AccessControlException permission denied
    * @throws IOException If an I/O error occurred
    */
+  @Idempotent
   public void renewLease(String clientName) throws AccessControlException,
       IOException;
 
@@ -543,6 +554,7 @@ public interface ClientProtocol {
    * @return true if the file is already closed
    * @throws IOException
    */
+  @Idempotent
   public boolean recoverLease(String src, String clientName) throws IOException;
 
   public int GET_STATS_CAPACITY_IDX = 0;
@@ -554,7 +566,7 @@ public interface ClientProtocol {
   
   /**
    * Get a set of statistics about the filesystem.
-   * Right now, only three values are returned.
+   * Right now, only seven values are returned.
    * <ul>
    * <li> [0] contains the total storage capacity of the system, in bytes.</li>
    * <li> [1] contains the total used space of the system, in bytes.</li>
@@ -567,6 +579,7 @@ public interface ClientProtocol {
    * Use public constants like {@link #GET_STATS_CAPACITY_IDX} in place of 
    * actual numbers to index into the array.
    */
+  @Idempotent
   public long[] getStats() throws IOException;
 
   /**
@@ -575,6 +588,7 @@ public interface ClientProtocol {
    * Return live datanodes if type is LIVE; dead datanodes if type is DEAD;
    * otherwise all datanodes if type is ALL.
    */
+  @Idempotent
   public DatanodeInfo[] getDatanodeReport(HdfsConstants.DatanodeReportType type)
       throws IOException;
 
@@ -585,6 +599,7 @@ public interface ClientProtocol {
    * @throws IOException
    * @throws UnresolvedLinkException if the path contains a symlink. 
    */
+  @Idempotent
   public long getPreferredBlockSize(String filename) 
       throws IOException, UnresolvedLinkException;
 
@@ -700,9 +715,9 @@ public interface ClientProtocol {
    * all corrupt files, call this method repeatedly and each time pass in the
    * cookie returned from the previous call.
    */
-  public CorruptFileBlocks
-    listCorruptFileBlocks(String path, String cookie)
-    throws IOException;
+  @Idempotent
+  public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
+      throws IOException;
   
   /**
    * Dumps namenode data structures into specified file. If the file
@@ -719,6 +734,7 @@ public interface ClientProtocol {
    * @param bandwidth Blanacer bandwidth in bytes per second for this datanode.
    * @throws IOException
    */
+  @Idempotent
   public void setBalancerBandwidth(long bandwidth) throws IOException;
   
   /**
@@ -732,6 +748,7 @@ public interface ClientProtocol {
    * @throws UnresolvedLinkException if the path contains a symlink. 
    * @throws IOException If an I/O error occurred        
    */
+  @Idempotent
   public HdfsFileStatus getFileInfo(String src) throws AccessControlException,
       FileNotFoundException, UnresolvedLinkException, IOException;
 
@@ -747,6 +764,7 @@ public interface ClientProtocol {
    * @throws UnresolvedLinkException if <code>src</code> contains a symlink
    * @throws IOException If an I/O error occurred        
    */
+  @Idempotent
   public HdfsFileStatus getFileLinkInfo(String src)
       throws AccessControlException, UnresolvedLinkException, IOException;
   
@@ -759,6 +777,7 @@ public interface ClientProtocol {
    * @throws UnresolvedLinkException if <code>path</code> contains a symlink. 
    * @throws IOException If an I/O error occurred
    */
+  @Idempotent
   public ContentSummary getContentSummary(String path)
       throws AccessControlException, FileNotFoundException,
       UnresolvedLinkException, IOException;
@@ -784,6 +803,7 @@ public interface ClientProtocol {
    * @throws UnresolvedLinkException if the <code>path</code> contains a symlink. 
    * @throws IOException If an I/O error occurred
    */
+  @Idempotent
   public void setQuota(String path, long namespaceQuota, long diskspaceQuota)
       throws AccessControlException, FileNotFoundException,
       UnresolvedLinkException, IOException;
@@ -799,6 +819,7 @@ public interface ClientProtocol {
    * @throws UnresolvedLinkException if <code>src</code> contains a symlink. 
    * @throws IOException If an I/O error occurred
    */
+  @Idempotent
   public void fsync(String src, String client) 
       throws AccessControlException, FileNotFoundException, 
       UnresolvedLinkException, IOException;
@@ -818,6 +839,7 @@ public interface ClientProtocol {
    * @throws UnresolvedLinkException if <code>src</code> contains a symlink. 
    * @throws IOException If an I/O error occurred
    */
+  @Idempotent
   public void setTimes(String src, long mtime, long atime)
       throws AccessControlException, FileNotFoundException, 
       UnresolvedLinkException, IOException;
@@ -858,6 +880,7 @@ public interface ClientProtocol {
    * @throws IOException If the given path does not refer to a symlink
    *           or an I/O error occurred
    */
+  @Idempotent
   public String getLinkTarget(String path) throws AccessControlException,
       FileNotFoundException, IOException; 
   
@@ -873,6 +896,7 @@ public interface ClientProtocol {
    * @return a located block with a new generation stamp and an access token
    * @throws IOException if any error occurs
    */
+  @Idempotent
   public LocatedBlock updateBlockForPipeline(ExtendedBlock block,
       String clientName) throws IOException;
 
@@ -896,6 +920,7 @@ public interface ClientProtocol {
    * @return Token<DelegationTokenIdentifier>
    * @throws IOException
    */
+  @Idempotent
   public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) 
       throws IOException;
 
@@ -906,6 +931,7 @@ public interface ClientProtocol {
    * @return the new expiration time
    * @throws IOException
    */
+  @Idempotent
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException;
   

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java Sun Mar 11 17:55:58 2012
@@ -100,6 +100,14 @@ public class HdfsConstants {
   public static final String HDFS_URI_SCHEME = "hdfs";
 
   /**
+   * A prefix put before the namenode URI inside the "service" field
+   * of a delgation token, indicating that the URI is a logical (HA)
+   * URI.
+   */
+  public static final String HA_DT_SERVICE_PREFIX = "ha-hdfs:";
+
+
+  /**
    * Please see {@link LayoutVersion} on adding new layout version.
    */
   public static final int LAYOUT_VERSION = LayoutVersion

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java Sun Mar 11 17:55:58 2012
@@ -91,7 +91,10 @@ public class LayoutVersion {
     STORED_TXIDS(-37, "Transaction IDs are stored in edits log and image files"),
     TXID_BASED_LAYOUT(-38, "File names in NN Storage are based on transaction IDs"), 
     EDITLOG_OP_OPTIMIZATION(-39,
-        "Use LongWritable and ShortWritable directly instead of ArrayWritable of UTF8");
+        "Use LongWritable and ShortWritable directly instead of ArrayWritable of UTF8"),
+    OPTIMIZE_PERSIST_BLOCKS(-40,
+        "Serialize block lists with delta-encoded variable length ints, " +
+        "add OP_UPDATE_BLOCKS");
     
     final int lv;
     final int ancestorLV;

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java Sun Mar 11 17:55:58 2012
@@ -45,6 +45,7 @@ import org.apache.hadoop.ipc.ProtobufHel
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
 import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RpcClientUtil;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
@@ -63,7 +64,8 @@ import com.google.protobuf.ServiceExcept
 @InterfaceAudience.Private
 @InterfaceStability.Stable
 public class ClientDatanodeProtocolTranslatorPB implements
-    ProtocolMetaInterface, ClientDatanodeProtocol, Closeable {
+    ProtocolMetaInterface, ClientDatanodeProtocol,
+    ProtocolTranslator, Closeable {
   public static final Log LOG = LogFactory
       .getLog(ClientDatanodeProtocolTranslatorPB.class);
   
@@ -198,4 +200,9 @@ public class ClientDatanodeProtocolTrans
         ClientDatanodeProtocolPB.class, RpcKind.RPC_PROTOCOL_BUFFER,
         RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), methodName);
   }
+
+  @Override
+  public Object getUnderlyingProxyObject() {
+    return rpcProxy;
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Sun Mar 11 17:55:58 2012
@@ -20,15 +20,10 @@ package org.apache.hadoop.hdfs.protocolP
 import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -49,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
+import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -137,52 +133,14 @@ import com.google.protobuf.ServiceExcept
 @InterfaceAudience.Private
 @InterfaceStability.Stable
 public class ClientNamenodeProtocolTranslatorPB implements
-    ProtocolMetaInterface, ClientProtocol, Closeable {
+    ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
   final private ClientNamenodeProtocolPB rpcProxy;
 
-  private static ClientNamenodeProtocolPB createNamenode(
-      InetSocketAddress nameNodeAddr, Configuration conf,
-      UserGroupInformation ugi) throws IOException {
-    RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
-        ProtobufRpcEngine.class);
-    return RPC.getProxy(ClientNamenodeProtocolPB.class,
-        RPC.getProtocolVersion(ClientNamenodeProtocolPB.class), nameNodeAddr, ugi, conf,
-        NetUtils.getSocketFactory(conf, ClientNamenodeProtocolPB.class));
-  }
-
-  /** Create a {@link NameNode} proxy */
-  static ClientNamenodeProtocolPB createNamenodeWithRetry(
-      ClientNamenodeProtocolPB rpcNamenode) {
-    RetryPolicy createPolicy = RetryPolicies
-        .retryUpToMaximumCountWithFixedSleep(5,
-            HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
-
-    Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap 
-        = new HashMap<Class<? extends Exception>, RetryPolicy>();
-    remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class,
-        createPolicy);
-
-    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = 
-        new HashMap<Class<? extends Exception>, RetryPolicy>();
-    exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
-        .retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
-            remoteExceptionToPolicyMap));
-    RetryPolicy methodPolicy = RetryPolicies.retryByException(
-        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
-    Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
-
-    methodNameToPolicyMap.put("create", methodPolicy);
-
-    return (ClientNamenodeProtocolPB) RetryProxy.create(
-        ClientNamenodeProtocolPB.class, rpcNamenode, methodNameToPolicyMap);
-  }
-
-  public ClientNamenodeProtocolTranslatorPB(InetSocketAddress nameNodeAddr,
-      Configuration conf, UserGroupInformation ugi) throws IOException {
-    
-    rpcProxy = createNamenodeWithRetry(createNamenode(nameNodeAddr, conf, ugi));
+  public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy)
+      throws IOException {
+    rpcProxy = proxy;
   }
-
+  
   public void close() {
     RPC.stopProxy(rpcProxy);
   }
@@ -866,4 +824,9 @@ public class ClientNamenodeProtocolTrans
         ClientNamenodeProtocolPB.class, RpcKind.RPC_PROTOCOL_BUFFER,
         RPC.getProtocolVersion(ClientNamenodeProtocolPB.class), methodName);
   }
+
+  @Override
+  public Object getUnderlyingProxyObject() {
+    return rpcProxy;
+  }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java Sun Mar 11 17:55:58 2012
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.NNHAStatusHeartbeatProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeRequestProto;
@@ -55,6 +56,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@@ -161,7 +163,7 @@ public class DatanodeProtocolClientSideT
   }
 
   @Override
-  public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration,
+  public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
       StorageReport[] reports, int xmitsInProgress, int xceiverCount,
       int failedVolumes) throws IOException {
     HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
@@ -184,7 +186,7 @@ public class DatanodeProtocolClientSideT
       cmds[index] = PBHelper.convert(p);
       index++;
     }
-    return cmds;
+    return new HeartbeatResponse(cmds, PBHelper.convert(resp.getHaStatus()));
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java Sun Mar 11 17:55:58 2012
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@@ -103,7 +104,7 @@ public class DatanodeProtocolServerSideT
   @Override
   public HeartbeatResponseProto sendHeartbeat(RpcController controller,
       HeartbeatRequestProto request) throws ServiceException {
-    DatanodeCommand[] cmds = null;
+    HeartbeatResponse response;
     try {
       List<StorageReportProto> list = request.getReportsList();
       StorageReport[] report = new StorageReport[list.size()];
@@ -113,7 +114,7 @@ public class DatanodeProtocolServerSideT
             p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
             p.getBlockPoolUsed());
       }
-      cmds = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
+      response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
           report, request.getXmitsInProgress(), request.getXceiverCount(),
           request.getFailedVolumes());
     } catch (IOException e) {
@@ -121,6 +122,7 @@ public class DatanodeProtocolServerSideT
     }
     HeartbeatResponseProto.Builder builder = HeartbeatResponseProto
         .newBuilder();
+    DatanodeCommand[] cmds = response.getCommands();
     if (cmds != null) {
       for (int i = 0; i < cmds.length; i++) {
         if (cmds[i] != null) {
@@ -128,6 +130,7 @@ public class DatanodeProtocolServerSideT
         }
       }
     }
+    builder.setHaStatus(PBHelper.convert(response.getNameNodeHaState()));
     return builder.build();
   }
 

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolClientSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolClientSideTranslatorPB.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolClientSideTranslatorPB.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolClientSideTranslatorPB.java Sun Mar 11 17:55:58 2012
@@ -20,22 +20,15 @@ package org.apache.hadoop.hdfs.protocolP
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.proto.GetUserMappingsProtocolProtos.GetGroupsForUserRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.GetUserMappingsProtocolProtos.GetGroupsForUserResponseProto;
 import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.ipc.ProtobufHelper;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RpcClientUtil;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 
 import com.google.protobuf.RpcController;
@@ -47,16 +40,10 @@ public class GetUserMappingsProtocolClie
   /** RpcController is not used and hence is set to null */
   private final static RpcController NULL_CONTROLLER = null;
   private final GetUserMappingsProtocolPB rpcProxy;
-
+  
   public GetUserMappingsProtocolClientSideTranslatorPB(
-      InetSocketAddress nameNodeAddr, UserGroupInformation ugi,
-      Configuration conf) throws IOException {
-    RPC.setProtocolEngine(conf, GetUserMappingsProtocolPB.class,
-        ProtobufRpcEngine.class);
-    rpcProxy = RPC.getProxy(GetUserMappingsProtocolPB.class,
-        RPC.getProtocolVersion(GetUserMappingsProtocolPB.class),
-        NameNode.getAddress(conf), ugi, conf,
-        NetUtils.getSocketFactory(conf, GetUserMappingsProtocol.class));
+      GetUserMappingsProtocolPB rpcProxy) {
+    this.rpcProxy = rpcProxy;
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java Sun Mar 11 17:55:58 2012
@@ -19,17 +19,14 @@ package org.apache.hadoop.hdfs.protocolP
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
 import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
 import org.apache.hadoop.ipc.ProtocolSignature;
@@ -52,12 +49,9 @@ public class JournalProtocolTranslatorPB
   /** RpcController is not used and hence is set to null */
   private final static RpcController NULL_CONTROLLER = null;
   private final JournalProtocolPB rpcProxy;
-
-  public JournalProtocolTranslatorPB(InetSocketAddress nameNodeAddr,
-      Configuration conf) throws IOException {
-    RPC.setProtocolEngine(conf, JournalProtocolPB.class, ProtobufRpcEngine.class);
-    rpcProxy = RPC.getProxy(JournalProtocolPB.class,
-        RPC.getProtocolVersion(JournalProtocolPB.class), nameNodeAddr, conf);
+  
+  public JournalProtocolTranslatorPB(JournalProtocolPB rpcProxy) {
+    this.rpcProxy = rpcProxy;
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java Sun Mar 11 17:55:58 2012
@@ -19,11 +19,9 @@ package org.apache.hadoop.hdfs.protocolP
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto;
@@ -47,14 +45,11 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.ipc.ProtobufHelper;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RpcClientUtil;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
@@ -84,15 +79,6 @@ public class NamenodeProtocolTranslatorP
       VersionRequestProto.newBuilder().build();
 
   final private NamenodeProtocolPB rpcProxy;
-
-  public NamenodeProtocolTranslatorPB(InetSocketAddress nameNodeAddr,
-      Configuration conf, UserGroupInformation ugi) throws IOException {
-    RPC.setProtocolEngine(conf, NamenodeProtocolPB.class,
-        ProtobufRpcEngine.class);
-    rpcProxy = RPC.getProxy(NamenodeProtocolPB.class,
-        RPC.getProtocolVersion(NamenodeProtocolPB.class), nameNodeAddr, ugi,
-        conf, NetUtils.getSocketFactory(conf, NamenodeProtocolPB.class));
-  }
   
   public NamenodeProtocolTranslatorPB(NamenodeProtocolPB rpcProxy) {
     this.rpcProxy = rpcProxy;
@@ -137,7 +123,6 @@ public class NamenodeProtocolTranslatorP
   }
 
   @Override
-  @SuppressWarnings("deprecation")
   public CheckpointSignature rollEditLog() throws IOException {
     try {
       return PBHelper.convert(rpcProxy.rollEditLog(NULL_CONTROLLER,

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Sun Mar 11 17:55:58 2012
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeStorageProto.StorageState;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.NNHAStatusHeartbeatProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
@@ -119,7 +120,9 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
@@ -819,6 +822,23 @@ public class PBHelper {
     ReceivedDeletedBlockInfoProto.Builder builder = 
         ReceivedDeletedBlockInfoProto.newBuilder();
     
+    ReceivedDeletedBlockInfoProto.BlockStatus status;
+    switch (receivedDeletedBlockInfo.getStatus()) {
+    case RECEIVING_BLOCK:
+      status = ReceivedDeletedBlockInfoProto.BlockStatus.RECEIVING;
+      break;
+    case RECEIVED_BLOCK:
+      status = ReceivedDeletedBlockInfoProto.BlockStatus.RECEIVED;
+      break;
+    case DELETED_BLOCK:
+      status = ReceivedDeletedBlockInfoProto.BlockStatus.DELETED;
+      break;
+    default:
+      throw new IllegalArgumentException("Bad status: " +
+          receivedDeletedBlockInfo.getStatus());
+    }
+    builder.setStatus(status);
+    
     if (receivedDeletedBlockInfo.getDelHints() != null) {
       builder.setDeleteHint(receivedDeletedBlockInfo.getDelHints());
     }
@@ -850,7 +870,21 @@ public class PBHelper {
 
   public static ReceivedDeletedBlockInfo convert(
       ReceivedDeletedBlockInfoProto proto) {
-    return new ReceivedDeletedBlockInfo(PBHelper.convert(proto.getBlock()),
+    ReceivedDeletedBlockInfo.BlockStatus status = null;
+    switch (proto.getStatus()) {
+    case RECEIVING:
+      status = BlockStatus.RECEIVING_BLOCK;
+      break;
+    case RECEIVED:
+      status = BlockStatus.RECEIVED_BLOCK;
+      break;
+    case DELETED:
+      status = BlockStatus.DELETED_BLOCK;
+      break;
+    }
+    return new ReceivedDeletedBlockInfo(
+        PBHelper.convert(proto.getBlock()),
+        status,
         proto.hasDeleteHint() ? proto.getDeleteHint() : null);
   }
   
@@ -1245,6 +1279,37 @@ public class PBHelper {
         build();
   }
 
+  public static NNHAStatusHeartbeat convert(NNHAStatusHeartbeatProto s) {
+    if (s == null) return null;
+    switch (s.getState()) {
+    case ACTIVE:
+      return new NNHAStatusHeartbeat(NNHAStatusHeartbeat.State.ACTIVE, s.getTxid());
+    case STANDBY:
+      return new NNHAStatusHeartbeat(NNHAStatusHeartbeat.State.STANDBY, s.getTxid());
+    default:
+      throw new IllegalArgumentException("Unexpected NNHAStatusHeartbeat.State:" + s.getState());
+    }
+  }
+
+  public static NNHAStatusHeartbeatProto convert(NNHAStatusHeartbeat hb) {
+    if (hb == null) return null;
+    NNHAStatusHeartbeatProto.Builder builder =
+      NNHAStatusHeartbeatProto.newBuilder();
+    switch (hb.getState()) {
+      case ACTIVE:
+        builder.setState(NNHAStatusHeartbeatProto.State.ACTIVE);
+        break;
+      case STANDBY:
+        builder.setState(NNHAStatusHeartbeatProto.State.STANDBY);
+        break;
+      default:
+        throw new IllegalArgumentException("Unexpected NNHAStatusHeartbeat.State:" +
+            hb.getState());
+    }
+    builder.setTxid(hb.getTxId());
+    return builder.build();
+  }
+
   public static DatanodeStorageProto convert(DatanodeStorage s) {
     return DatanodeStorageProto.newBuilder()
         .setState(PBHelper.convert(s.getState()))

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RefreshAuthorizationPolicyProtocolClientSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RefreshAuthorizationPolicyProtocolClientSideTranslatorPB.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RefreshAuthorizationPolicyProtocolClientSideTranslatorPB.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RefreshAuthorizationPolicyProtocolClientSideTranslatorPB.java Sun Mar 11 17:55:58 2012
@@ -20,21 +20,15 @@ package org.apache.hadoop.hdfs.protocolP
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.proto.RefreshAuthorizationPolicyProtocolProtos.RefreshServiceAclRequestProto;
 import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.ipc.ProtobufHelper;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RpcClientUtil;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 
 import com.google.protobuf.RpcController;
@@ -46,16 +40,10 @@ public class RefreshAuthorizationPolicyP
   /** RpcController is not used and hence is set to null */
   private final static RpcController NULL_CONTROLLER = null;
   private final RefreshAuthorizationPolicyProtocolPB rpcProxy;
-
+  
   public RefreshAuthorizationPolicyProtocolClientSideTranslatorPB(
-      InetSocketAddress nameNodeAddr, UserGroupInformation ugi,
-      Configuration conf) throws IOException {
-    RPC.setProtocolEngine(conf, RefreshAuthorizationPolicyProtocolPB.class,
-        ProtobufRpcEngine.class);
-    rpcProxy = RPC.getProxy(RefreshAuthorizationPolicyProtocolPB.class,
-        RPC.getProtocolVersion(RefreshAuthorizationPolicyProtocolPB.class),
-        NameNode.getAddress(conf), ugi, conf,
-        NetUtils.getSocketFactory(conf, RefreshAuthorizationPolicyProtocol.class));
+      RefreshAuthorizationPolicyProtocolPB rpcProxy) {
+    this.rpcProxy = rpcProxy;
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RefreshUserMappingsProtocolClientSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RefreshUserMappingsProtocolClientSideTranslatorPB.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RefreshUserMappingsProtocolClientSideTranslatorPB.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RefreshUserMappingsProtocolClientSideTranslatorPB.java Sun Mar 11 17:55:58 2012
@@ -20,23 +20,17 @@ package org.apache.hadoop.hdfs.protocolP
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.proto.RefreshUserMappingsProtocolProtos.RefreshSuperUserGroupsConfigurationRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.RefreshUserMappingsProtocolProtos.RefreshUserToGroupsMappingsRequestProto;
 import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.ipc.ProtobufHelper;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RpcClientUtil;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
-import org.apache.hadoop.security.UserGroupInformation;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
@@ -47,16 +41,10 @@ public class RefreshUserMappingsProtocol
   /** RpcController is not used and hence is set to null */
   private final static RpcController NULL_CONTROLLER = null;
   private final RefreshUserMappingsProtocolPB rpcProxy;
-
+  
   public RefreshUserMappingsProtocolClientSideTranslatorPB(
-      InetSocketAddress nameNodeAddr, UserGroupInformation ugi,
-      Configuration conf) throws IOException {
-    RPC.setProtocolEngine(conf, RefreshUserMappingsProtocolPB.class,
-        ProtobufRpcEngine.class);
-    rpcProxy = RPC.getProxy(RefreshUserMappingsProtocolPB.class,
-        RPC.getProtocolVersion(RefreshUserMappingsProtocolPB.class),
-        NameNode.getAddress(conf), ugi, conf,
-        NetUtils.getSocketFactory(conf, RefreshUserMappingsProtocol.class));
+      RefreshUserMappingsProtocolPB rpcProxy) {
+    this.rpcProxy = rpcProxy;
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java Sun Mar 11 17:55:58 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.security.
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
 
@@ -283,7 +284,18 @@ public class DelegationTokenSecretManage
   @Override //AbstractDelegationTokenManager
   protected void logUpdateMasterKey(DelegationKey key)
       throws IOException {
-    namesystem.logUpdateMasterKey(key);
+    synchronized (noInterruptsLock) {
+      // The edit logging code will fail catastrophically if it
+      // is interrupted during a logSync, since the interrupt
+      // closes the edit log files. Doing this inside the
+      // above lock and then checking interruption status
+      // prevents this bug.
+      if (Thread.interrupted()) {
+        throw new InterruptedIOException(
+            "Interrupted before updating master key");
+      }
+      namesystem.logUpdateMasterKey(key);
+    }
   }
 
   /** A utility method for creating credentials. */

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSelector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSelector.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSelector.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSelector.java Sun Mar 11 17:55:58 2012
@@ -59,6 +59,11 @@ public class DelegationTokenSelector
         new InetSocketAddress(nnAddr.getHostName(), nnRpcPort));
     return INSTANCE.selectToken(serviceName, ugi.getTokens());
   }
+  
+  public static Token<DelegationTokenIdentifier> selectHdfsDelegationToken(
+      Text serviceName, UserGroupInformation ugi) {
+    return INSTANCE.selectToken(serviceName, ugi.getTokens());
+  }
 
   public DelegationTokenSelector() {
     super(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);



Mime
View raw message