hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1099687 [2/15] - in /hadoop/hdfs/branches/HDFS-1073: ./ bin/ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/pro...
Date Thu, 05 May 2011 05:40:13 GMT
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSUtil.java Thu May  5 05:40:07 2011
@@ -18,20 +18,31 @@
 
 package org.apache.hadoop.hdfs;
 
+import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Comparator;
 import java.util.StringTokenizer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.security.UserGroupInformation;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 
 @InterfaceAudience.Private
 public class DFSUtil {
+  
   /**
    * Compartor for sorting DataNodeInfo[] based on decommissioned states.
    * Decommissioned nodes are moved to the end of the array on sorting with
@@ -245,6 +256,324 @@ public class DFSUtil {
     return blkLocations;
   }
 
+  /**
+   * Returns collection of nameservice Ids from the configuration.
+   * @param conf configuration
+   * @return collection of nameservice Ids
+   */
+  public static Collection<String> getNameServiceIds(Configuration conf) {
+    return conf.getStringCollection(DFS_FEDERATION_NAMESERVICES);
+  }
 
-}
+  /**
+   * Given a list of keys in the order of preference, returns a value
+   * for the key in the given order from the configuration.
+   * @param defaultValue default value to return, when key was not found
+   * @param keySuffix suffix to add to the key, if it is not null
+   * @param conf Configuration
+   * @param keys list of keys in the order of preference
+   * @return value of the key or default if a key was not found in configuration
+   */
+  private static String getConfValue(String defaultValue, String keySuffix,
+      Configuration conf, String... keys) {
+    String value = null;
+    for (String key : keys) {
+      if (keySuffix != null) {
+        key += "." + keySuffix;
+      }
+      value = conf.get(key);
+      if (value != null) {
+        break;
+      }
+    }
+    if (value == null) {
+      value = defaultValue;
+    }
+    return value;
+  }
+  
+  /**
+   * Returns list of InetSocketAddress for a given set of keys.
+   * @param conf configuration
+   * @param defaultAddress default address to return in case key is not found
+   * @param keys Set of keys to look for in the order of preference
+   * @return list of InetSocketAddress corresponding to the key
+   */
+  private static List<InetSocketAddress> getAddresses(Configuration conf,
+      String defaultAddress, String... keys) {
+    Collection<String> nameserviceIds = getNameServiceIds(conf);
+    List<InetSocketAddress> isas = new ArrayList<InetSocketAddress>();
+
+    // Configuration with a single namenode
+    if (nameserviceIds == null || nameserviceIds.isEmpty()) {
+      String address = getConfValue(defaultAddress, null, conf, keys);
+      if (address == null) {
+        return null;
+      }
+      isas.add(NetUtils.createSocketAddr(address));
+    } else {
+      // Get the namenodes for all the configured nameServiceIds
+      for (String nameserviceId : nameserviceIds) {
+        String address = getConfValue(null, nameserviceId, conf, keys);
+        if (address == null) {
+          return null;
+        }
+        isas.add(NetUtils.createSocketAddr(address));
+      }
+    }
+    return isas;
+  }
+  
+  /**
+   * Returns list of InetSocketAddress corresponding to  backup node rpc 
+   * addresses from the configuration.
+   * 
+   * @param conf configuration
+   * @return list of InetSocketAddresses
+   * @throws IOException on error
+   */
+  public static List<InetSocketAddress> getBackupNodeAddresses(
+      Configuration conf) throws IOException {
+    List<InetSocketAddress> addressList = getAddresses(conf,
+        null, DFS_NAMENODE_BACKUP_ADDRESS_KEY);
+    if (addressList == null) {
+      throw new IOException("Incorrect configuration: backup node address "
+          + DFS_NAMENODE_BACKUP_ADDRESS_KEY + " is not configured.");
+    }
+    return addressList;
+  }
 
+  /**
+   * Returns list of InetSocketAddresses of corresponding to secondary namenode
+   * http addresses from the configuration.
+   * 
+   * @param conf configuration
+   * @return list of InetSocketAddresses
+   * @throws IOException on error
+   */
+  public static List<InetSocketAddress> getSecondaryNameNodeAddresses(
+      Configuration conf) throws IOException {
+    List<InetSocketAddress> addressList = getAddresses(conf, null,
+        DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY);
+    if (addressList == null) {
+      throw new IOException("Incorrect configuration: secondary namenode address "
+          + DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY + " is not configured.");
+    }
+    return addressList;
+  }
+
+  /**
+   * Returns list of InetSocketAddresses corresponding to namenodes from the
+   * configuration. Note this is to be used by datanodes to get the list of
+   * namenode addresses to talk to.
+   * 
+   * Returns namenode address specifically configured for datanodes (using
+   * service ports), if found. If not, regular RPC address configured for other
+   * clients is returned.
+   * 
+   * @param conf configuration
+   * @return list of InetSocketAddress
+   * @throws IOException on error
+   */
+  public static List<InetSocketAddress> getNNServiceRpcAddresses(
+      Configuration conf) throws IOException {
+    // Use default address as fall back
+    String defaultAddress;
+    try {
+      defaultAddress = NameNode.getHostPortString(NameNode.getAddress(conf));
+    } catch (IllegalArgumentException e) {
+      defaultAddress = null;
+    }
+    
+    List<InetSocketAddress> addressList = getAddresses(conf, defaultAddress,
+        DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
+    if (addressList == null) {
+      throw new IOException("Incorrect configuration: namenode address "
+          + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or "  
+          + DFS_NAMENODE_RPC_ADDRESS_KEY
+          + " is not configured.");
+    }
+    return addressList;
+  }
+  
+  /**
+   * Given the InetSocketAddress for any configured communication with a 
+   * namenode, this method returns the corresponding nameservice ID,
+   * by doing a reverse lookup on the list of nameservices until it
+   * finds a match.
+   * If null is returned, client should try {@link #isDefaultNamenodeAddress}
+   * to check pre-Federated configurations.
+   * Since the process of resolving URIs to Addresses is slightly expensive,
+   * this utility method should not be used in performance-critical routines.
+   * 
+   * @param conf - configuration
+   * @param address - InetSocketAddress for configured communication with NN.
+   *     Configured addresses are typically given as URIs, but we may have to
+   *     compare against a URI typed in by a human, or the server name may be
+   *     aliased, so we compare unambiguous InetSocketAddresses instead of just
+   *     comparing URI substrings.
+   * @param keys - list of configured communication parameters that should
+   *     be checked for matches.  For example, to compare against RPC addresses,
+   *     provide the list DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
+   *     DFS_NAMENODE_RPC_ADDRESS_KEY.  Use the generic parameter keys,
+   *     not the NameServiceId-suffixed keys.
+   * @return nameserviceId, or null if no match found
+   */
+  public static String getNameServiceIdFromAddress(Configuration conf, 
+      InetSocketAddress address, String... keys) {
+    Collection<String> nameserviceIds = getNameServiceIds(conf);
+
+    // Configuration with a single namenode and no nameserviceId
+    if (nameserviceIds == null || nameserviceIds.isEmpty()) {
+      // client should try {@link isDefaultNamenodeAddress} instead
+      return null;
+    }
+    // Get the candidateAddresses for all the configured nameServiceIds
+    for (String nameserviceId : nameserviceIds) {
+      for (String key : keys) {
+        String candidateAddress = conf.get(
+            getNameServiceIdKey(key, nameserviceId));
+        if (candidateAddress != null
+            && address.equals(NetUtils.createSocketAddr(candidateAddress)))
+          return nameserviceId;
+      }
+    }
+    // didn't find a match
+    // client should try {@link isDefaultNamenodeAddress} instead
+    return null;
+  }
+
+  /**
+   * return HTTP server info from the configuration
+   * @param conf
+   * @param namenode - namenode address
+   * @return http server info
+   */
+  public static String getInfoServer(
+      InetSocketAddress namenode, Configuration conf) {
+    String httpAddress = null;
+    
+    String httpAddressKey = UserGroupInformation.isSecurityEnabled() ?
+        DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY
+        : DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
+    String httpAddressDefault = UserGroupInformation.isSecurityEnabled() ?
+        DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT 
+        :DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
+    
+    if(namenode != null) {
+      // if non-default namenode, try reverse look up 
+      // the nameServiceID if it is available
+      String nameServiceId = DFSUtil.getNameServiceIdFromAddress(
+          conf, namenode,
+          DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
+          DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
+
+      if (nameServiceId != null) {
+        httpAddress = conf.get(DFSUtil.getNameServiceIdKey(
+            httpAddressKey, nameServiceId));
+      }
+    }
+    // else - Use non-federation style configuration
+    if (httpAddress == null) {
+      httpAddress = conf.get(httpAddressKey, httpAddressDefault);
+    }
+
+    return httpAddress;
+  }
+  
+  /**
+   * Given the InetSocketAddress for any configured communication with a 
+   * namenode, this method determines whether it is the configured
+   * communication channel for the "default" namenode.
+   * It does a reverse lookup on the list of default communication parameters
+   * to see if the given address matches any of them.
+   * Since the process of resolving URIs to Addresses is slightly expensive,
+   * this utility method should not be used in performance-critical routines.
+   * 
+   * @param conf - configuration
+   * @param address - InetSocketAddress for configured communication with NN.
+   *     Configured addresses are typically given as URIs, but we may have to
+   *     compare against a URI typed in by a human, or the server name may be
+   *     aliased, so we compare unambiguous InetSocketAddresses instead of just
+   *     comparing URI substrings.
+   * @param keys - list of configured communication parameters that should
+   *     be checked for matches.  For example, to compare against RPC addresses,
+   *     provide the list DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
+   *     DFS_NAMENODE_RPC_ADDRESS_KEY
+   * @return - boolean confirmation if matched generic parameter
+   */
+  public static boolean isDefaultNamenodeAddress(Configuration conf,
+      InetSocketAddress address, String... keys) {
+    for (String key : keys) {
+      String candidateAddress = conf.get(key);
+      if (candidateAddress != null
+          && address.equals(NetUtils.createSocketAddr(candidateAddress)))
+        return true;
+    }
+    return false;
+  }
+  
+  /**
+   * @return key specific to a nameserviceId from a generic key
+   */
+  public static String getNameServiceIdKey(String key, String nameserviceId) {
+    return key + "." + nameserviceId;
+  }
+  
+  /**
+   * Sets the node specific setting into generic configuration key. Looks up
+   * value of "key.nameserviceId" and if found sets that value into generic key 
+   * in the conf. Note that this only modifies the runtime conf.
+   * 
+   * @param conf
+   *          Configuration object to lookup specific key and to set the value
+   *          to the key passed. Note the conf object is modified.
+   * @param nameserviceId
+   *          nameservice Id to construct the node specific key.
+   * @param keys
+   *          The key for which node specific value is looked up
+   */
+  public static void setGenericConf(Configuration conf,
+      String nameserviceId, String... keys) {
+    for (String key : keys) {
+      String value = conf.get(getNameServiceIdKey(key, nameserviceId));
+      if (value != null) {
+        conf.set(key, value);
+      }
+    }
+  }
+  
+  /**
+   * Returns the configured nameservice Id
+   * 
+   * @param conf
+   *          Configuration object to lookup the nameserviceId
+   * @return nameserviceId string from conf
+   */
+  public static String getNameServiceId(Configuration conf) {
+    return conf.get(DFS_FEDERATION_NAMESERVICE_ID);
+  }
+  
+  /** Return used as percentage of capacity */
+  public static float getPercentUsed(long used, long capacity) {
+    return capacity <= 0 ? 100 : ((float)used * 100.0f)/(float)capacity; 
+  }
+  
+  /** Return remaining as percentage of capacity */
+  public static float getPercentRemaining(long remaining, long capacity) {
+    return capacity <= 0 ? 0 : ((float)remaining * 100.0f)/(float)capacity; 
+  }
+
+  /**
+   * @param address address of format host:port
+   * @return InetSocketAddress for the address
+   */
+  public static InetSocketAddress getSocketAddress(String address) {
+    int colon = address.indexOf(":");
+    if (colon < 0) {
+      return new InetSocketAddress(address, 0);
+    }
+    return new InetSocketAddress(address.substring(0, colon), 
+        Integer.parseInt(address.substring(colon + 1)));
+  }
+}

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Thu May  5 05:40:07 2011
@@ -46,9 +46,9 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
@@ -233,7 +233,7 @@ public class DistributedFileSystem exten
       Progressable progress) throws IOException {
 
     statistics.incrementWriteOps(1);
-    DFSOutputStream op = (DFSOutputStream)dfs.append(getPathName(f), bufferSize, progress);
+    final DFSOutputStream op = dfs.append(getPathName(f), bufferSize, progress);
     return new FSDataOutputStream(op, statistics, op.getInitialLen());
   }
 
@@ -701,7 +701,7 @@ public class DistributedFileSystem exten
 
     // Find block in data stream.
     DFSClient.DFSDataInputStream dfsIn = (DFSClient.DFSDataInputStream) in;
-    Block dataBlock = dfsIn.getCurrentBlock();
+    ExtendedBlock dataBlock = dfsIn.getCurrentBlock();
     if (dataBlock == null) {
       LOG.error("Error: Current block in data stream is null! ");
       return false;
@@ -714,7 +714,7 @@ public class DistributedFileSystem exten
 
     // Find block in checksum stream
     DFSClient.DFSDataInputStream dfsSums = (DFSClient.DFSDataInputStream) sums;
-    Block sumsBlock = dfsSums.getCurrentBlock();
+    ExtendedBlock sumsBlock = dfsSums.getCurrentBlock();
     if (sumsBlock == null) {
       LOG.error("Error: Current block in checksum stream is null! ");
       return false;

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java Thu May  5 05:40:07 2011
@@ -138,12 +138,21 @@ public class HftpFileSystem extends File
     setConf(conf);
     this.ugi = UserGroupInformation.getCurrentUser(); 
     nnAddr = NetUtils.createSocketAddr(name.toString());
-   
-    nnHttpUrl = buildUri("https://", 
-                         NetUtils.normalizeHostName(name.getHost()), 
-                         conf.getInt(DFSConfigKeys.DFS_HTTPS_PORT_KEY, 
-                                     DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT));
+    
+    // in case we open connection to hftp of a different cluster
+    // we need to know this cluster https port
+    // if it is not set we assume it is the same cluster or same port
+    int urlPort = conf.getInt("dfs.hftp.https.port", -1);
+    if(urlPort == -1)
+      urlPort = conf.getInt(DFSConfigKeys.DFS_HTTPS_PORT_KEY, 
+          DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT);
+
+    nnHttpUrl = 
+      buildUri("https://", NetUtils.normalizeHostName(name.getHost()), urlPort);
+    LOG.debug("using url to get DT:" + nnHttpUrl);
 
+    
+    
     // if one uses RPC port different from the Default one,  
     // one should specify what is the setvice name for this delegation token
     // otherwise it is hostname:RPC_PORT

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/Block.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/Block.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/Block.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/Block.java Thu May  5 05:40:07 2011
@@ -153,13 +153,23 @@ public class Block implements Writable, 
   /////////////////////////////////////
   // Writable
   /////////////////////////////////////
+  @Override // Writable
   public void write(DataOutput out) throws IOException {
+    writeHelper(out);
+  }
+
+  @Override // Writable
+  public void readFields(DataInput in) throws IOException {
+    readHelper(in);
+  }
+  
+  final void writeHelper(DataOutput out) throws IOException {
     out.writeLong(blockId);
     out.writeLong(numBytes);
     out.writeLong(generationStamp);
   }
-
-  public void readFields(DataInput in) throws IOException {
+  
+  final void readHelper(DataInput in) throws IOException {
     this.blockId = in.readLong();
     this.numBytes = in.readLong();
     this.generationStamp = in.readLong();

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Thu May  5 05:40:07 2011
@@ -23,23 +23,47 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
 import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.token.TokenInfo;
 
 /** An client-datanode protocol for block recovery
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY)
 @TokenInfo(BlockTokenSelector.class)
 public interface ClientDatanodeProtocol extends VersionedProtocol {
   public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
 
   /**
-   * 6: recoverBlock() removed.
+   * 9: Added deleteBlockPool method
    */
-  public static final long versionID = 6L;
+  public static final long versionID = 9L;
 
   /** Return the visible length of a replica. */
-  long getReplicaVisibleLength(Block b) throws IOException;
+  long getReplicaVisibleLength(ExtendedBlock b) throws IOException;
+  
+  /**
+   * Refresh the list of federated namenodes from updated configuration
+   * Adds new namenodes and stops the deleted namenodes.
+   * 
+   * @throws IOException on error
+   **/
+  void refreshNamenodes() throws IOException;
+
+  /**
+   * Delete the block pool directory. If force is false it is deleted only if
+   * it is empty, otherwise it is deleted along with its contents.
+   * 
+   * @param bpid Blockpool id to be deleted.
+   * @param force If false blockpool directory is deleted only if it is empty 
+   *          i.e. if it doesn't contain any block files, otherwise it is 
+   *          deleted along with its contents.
+   * @throws IOException
+   */
+  void deleteBlockPool(String bpid, boolean force) throws IOException; 
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Thu May  5 05:40:07 2011
@@ -67,9 +67,9 @@ public interface ClientProtocol extends 
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 66: Add getAdditionalDatanode(..)
+   * 67: Add block pool ID to Block
    */
-  public static final long versionID = 66L;
+  public static final long versionID = 67L;
   
   ///////////////////////////////////////
   // File contents
@@ -126,7 +126,7 @@ public interface ClientProtocol extends 
    * <p>
    * Blocks have a maximum size.  Clients that intend to create
    * multi-block files must also use 
-   * {@link #addBlock(String, String, Block, DatanodeInfo[])}
+   * {@link #addBlock(String, String, ExtendedBlock, DatanodeInfo[])}
    *
    * @param src path of the file being created.
    * @param masked masked permission.
@@ -259,7 +259,7 @@ public interface ClientProtocol extends 
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
    * @throws IOException If an I/O error occurred
    */
-  public void abandonBlock(Block b, String src, String holder)
+  public void abandonBlock(ExtendedBlock b, String src, String holder)
       throws AccessControlException, FileNotFoundException,
       UnresolvedLinkException, IOException;
 
@@ -293,7 +293,7 @@ public interface ClientProtocol extends 
    * @throws IOException If an I/O error occurred
    */
   public LocatedBlock addBlock(String src, String clientName,
-      @Nullable Block previous, @Nullable DatanodeInfo[] excludeNodes)
+      @Nullable ExtendedBlock previous, @Nullable DatanodeInfo[] excludeNodes)
       throws AccessControlException, FileNotFoundException,
       NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
       IOException;
@@ -316,7 +316,7 @@ public interface ClientProtocol extends 
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
    * @throws IOException If an I/O error occurred
    */
-  public LocatedBlock getAdditionalDatanode(final String src, final Block blk,
+  public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
       final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
       final int numAdditionalNodes, final String clientName
       ) throws AccessControlException, FileNotFoundException,
@@ -329,7 +329,7 @@ public interface ClientProtocol extends 
    * The function returns whether the file has been closed successfully.
    * If the function returns false, the caller should try again.
    * 
-   * close() also commits the last block of the file by reporting
+   * close() also commits the last block of file by reporting
    * to the name-node the actual generation stamp and the length
    * of the block that the client has transmitted to data-nodes.
    *
@@ -344,7 +344,7 @@ public interface ClientProtocol extends 
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink 
    * @throws IOException If an I/O error occurred
    */
-  public boolean complete(String src, String clientName, Block last)
+  public boolean complete(String src, String clientName, ExtendedBlock last)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException;
 
@@ -554,6 +554,8 @@ public interface ClientProtocol extends 
    * <li> [3] contains number of under replicated blocks in the system.</li>
    * <li> [4] contains number of blocks with a corrupt replica. </li>
    * <li> [5] contains number of blocks without any good replicas left. </li>
+   * <li> [5] contains number of blocks without any good replicas left. </li>
+   * <li> [6] contains the total used space of the block pool. </li>
    * </ul>
    * Use public constants like {@link #GET_STATS_CAPACITY_IDX} in place of 
    * actual numbers to index into the array.
@@ -854,8 +856,8 @@ public interface ClientProtocol extends 
    * @return a located block with a new generation stamp and an access token
    * @throws IOException if any error occurs
    */
-  public LocatedBlock updateBlockForPipeline(Block block, String clientName) 
-      throws IOException;
+  public LocatedBlock updateBlockForPipeline(ExtendedBlock block,
+      String clientName) throws IOException;
 
   /**
    * Update a pipeline for a block under construction
@@ -866,8 +868,8 @@ public interface ClientProtocol extends 
    * @param newNodes datanodes in the pipeline
    * @throws IOException if any error occurs
    */
-  public void updatePipeline(String clientName, Block oldBlock, 
-      Block newBlock, DatanodeID[] newNodes)
+  public void updatePipeline(String clientName, ExtendedBlock oldBlock, 
+      ExtendedBlock newBlock, DatanodeID[] newNodes)
       throws IOException;
 
   /**

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Thu May  5 05:40:07 2011
@@ -51,10 +51,11 @@ public interface DataTransferProtocol {
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 22:
-   *    Add a new feature to replace datanode on failure.
+   * Version 23:
+   *    Changed the protocol methods to use ExtendedBlock instead
+   *    of Block.
    */
-  public static final int DATA_TRANSFER_VERSION = 22;
+  public static final int DATA_TRANSFER_VERSION = 23;
 
   /** Operation */
   public enum Op {
@@ -238,7 +239,7 @@ public interface DataTransferProtocol {
     }
 
     /** Send OP_READ_BLOCK */
-    public static void opReadBlock(DataOutputStream out, Block blk,
+    public static void opReadBlock(DataOutputStream out, ExtendedBlock blk,
         long blockOffset, long blockLen, String clientName,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException {
@@ -253,7 +254,7 @@ public interface DataTransferProtocol {
     }
     
     /** Send OP_WRITE_BLOCK */
-    public static void opWriteBlock(DataOutputStream out, Block blk,
+    public static void opWriteBlock(DataOutputStream out, ExtendedBlock blk,
         int pipelineSize, BlockConstructionStage stage, long newGs,
         long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
         DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
@@ -277,7 +278,7 @@ public interface DataTransferProtocol {
     }
 
     /** Send {@link Op#TRANSFER_BLOCK} */
-    public static void opTransferBlock(DataOutputStream out, Block blk,
+    public static void opTransferBlock(DataOutputStream out, ExtendedBlock blk,
         String client, DatanodeInfo[] targets,
         Token<BlockTokenIdentifier> blockToken) throws IOException {
       op(out, Op.TRANSFER_BLOCK);
@@ -291,7 +292,7 @@ public interface DataTransferProtocol {
 
     /** Send OP_REPLACE_BLOCK */
     public static void opReplaceBlock(DataOutputStream out,
-        Block blk, String storageId, DatanodeInfo src,
+        ExtendedBlock blk, String storageId, DatanodeInfo src,
         Token<BlockTokenIdentifier> blockToken) throws IOException {
       op(out, Op.REPLACE_BLOCK);
 
@@ -303,7 +304,7 @@ public interface DataTransferProtocol {
     }
 
     /** Send OP_COPY_BLOCK */
-    public static void opCopyBlock(DataOutputStream out, Block blk,
+    public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException {
       op(out, Op.COPY_BLOCK);
@@ -314,7 +315,7 @@ public interface DataTransferProtocol {
     }
 
     /** Send OP_BLOCK_CHECKSUM */
-    public static void opBlockChecksum(DataOutputStream out, Block blk,
+    public static void opBlockChecksum(DataOutputStream out, ExtendedBlock blk,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException {
       op(out, Op.BLOCK_CHECKSUM);
@@ -377,7 +378,7 @@ public interface DataTransferProtocol {
 
     /** Receive OP_READ_BLOCK */
     private void opReadBlock(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       final long offset = in.readLong();
       final long length = in.readLong();
@@ -390,13 +391,13 @@ public interface DataTransferProtocol {
     /**
      * Abstract OP_READ_BLOCK method. Read a block.
      */
-    protected abstract void opReadBlock(DataInputStream in, Block blk,
+    protected abstract void opReadBlock(DataInputStream in, ExtendedBlock blk,
         long offset, long length, String client,
         Token<BlockTokenIdentifier> blockToken) throws IOException;
     
     /** Receive OP_WRITE_BLOCK */
     private void opWriteBlock(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
       final BlockConstructionStage stage = 
@@ -418,7 +419,7 @@ public interface DataTransferProtocol {
      * Abstract OP_WRITE_BLOCK method. 
      * Write a block.
      */
-    protected abstract void opWriteBlock(DataInputStream in, Block blk,
+    protected abstract void opWriteBlock(DataInputStream in, ExtendedBlock blk,
         int pipelineSize, BlockConstructionStage stage, long newGs,
         long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
         DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
@@ -426,7 +427,7 @@ public interface DataTransferProtocol {
 
     /** Receive {@link Op#TRANSFER_BLOCK} */
     private void opTransferBlock(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       final String client = Text.readString(in);
       final DatanodeInfo targets[] = readDatanodeInfos(in);
@@ -440,14 +441,14 @@ public interface DataTransferProtocol {
      * For {@link BlockConstructionStage#TRANSFER_RBW}
      * or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
      */
-    protected abstract void opTransferBlock(DataInputStream in, Block blk,
+    protected abstract void opTransferBlock(DataInputStream in, ExtendedBlock blk,
         String client, DatanodeInfo[] targets,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException;
 
     /** Receive OP_REPLACE_BLOCK */
     private void opReplaceBlock(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       final String sourceId = Text.readString(in); // read del hint
       final DatanodeInfo src = DatanodeInfo.read(in); // read proxy source
@@ -461,12 +462,12 @@ public interface DataTransferProtocol {
      * It is used for balancing purpose; send to a destination
      */
     protected abstract void opReplaceBlock(DataInputStream in,
-        Block blk, String sourceId, DatanodeInfo src,
+        ExtendedBlock blk, String sourceId, DatanodeInfo src,
         Token<BlockTokenIdentifier> blockToken) throws IOException;
 
     /** Receive OP_COPY_BLOCK */
     private void opCopyBlock(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
@@ -477,13 +478,13 @@ public interface DataTransferProtocol {
      * Abstract OP_COPY_BLOCK method. It is used for balancing purpose; send to
      * a proxy source.
      */
-    protected abstract void opCopyBlock(DataInputStream in, Block blk,
+    protected abstract void opCopyBlock(DataInputStream in, ExtendedBlock blk,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException;
 
     /** Receive OP_BLOCK_CHECKSUM */
     private void opBlockChecksum(DataInputStream in) throws IOException {
-      final Block blk = new Block();
+      final ExtendedBlock blk = new ExtendedBlock();
       blk.readId(in);
       final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
@@ -495,7 +496,7 @@ public interface DataTransferProtocol {
      * Get the checksum of a block 
      */
     protected abstract void opBlockChecksum(DataInputStream in,
-        Block blk, Token<BlockTokenIdentifier> blockToken)
+        ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken)
         throws IOException;
 
     /** Read an array of {@link DatanodeInfo} */

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java Thu May  5 05:40:07 2011
@@ -24,6 +24,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -76,6 +77,18 @@ public class DatanodeID implements Writa
     this.ipcPort = ipcPort;
   }
   
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public void setInfoPort(int infoPort) {
+    this.infoPort = infoPort;
+  }
+  
+  public void setIpcPort(int ipcPort) {
+    this.ipcPort = ipcPort;
+  }
+  
   /**
    * @return hostname:portNumber.
    */

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Thu May  5 05:40:07 2011
@@ -24,6 +24,7 @@ import java.util.Date;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
@@ -48,6 +49,7 @@ public class DatanodeInfo extends Datano
   protected long capacity;
   protected long dfsUsed;
   protected long remaining;
+  protected long blockPoolUsed;
   protected long lastUpdate;
   protected int xceiverCount;
   protected String location = NetworkTopology.DEFAULT_RACK;
@@ -89,6 +91,7 @@ public class DatanodeInfo extends Datano
     this.capacity = from.getCapacity();
     this.dfsUsed = from.getDfsUsed();
     this.remaining = from.getRemaining();
+    this.blockPoolUsed = from.getBlockPoolUsed();
     this.lastUpdate = from.getLastUpdate();
     this.xceiverCount = from.getXceiverCount();
     this.location = from.getNetworkLocation();
@@ -101,6 +104,7 @@ public class DatanodeInfo extends Datano
     this.capacity = 0L;
     this.dfsUsed = 0L;
     this.remaining = 0L;
+    this.blockPoolUsed = 0L;
     this.lastUpdate = 0L;
     this.xceiverCount = 0;
     this.adminState = null;    
@@ -118,6 +122,9 @@ public class DatanodeInfo extends Datano
   /** The used space by the data node. */
   public long getDfsUsed() { return dfsUsed; }
 
+  /** The used space by the block pool on data node. */
+  public long getBlockPoolUsed() { return blockPoolUsed; }
+
   /** The used space by the data node. */
   public long getNonDfsUsed() { 
     long nonDFSUsed = capacity - dfsUsed - remaining;
@@ -126,23 +133,20 @@ public class DatanodeInfo extends Datano
 
   /** The used space by the data node as percentage of present capacity */
   public float getDfsUsedPercent() { 
-    if (capacity <= 0) {
-      return 100;
-    }
-
-    return ((float)dfsUsed * 100.0f)/(float)capacity; 
+    return DFSUtil.getPercentUsed(dfsUsed, capacity);
   }
 
   /** The raw free space. */
   public long getRemaining() { return remaining; }
 
+  /** Used space by the block pool as percentage of present capacity */
+  public float getBlockPoolUsedPercent() {
+    return DFSUtil.getPercentUsed(blockPoolUsed, capacity);
+  }
+  
   /** The remaining space as percentage of configured capacity. */
   public float getRemainingPercent() { 
-    if (capacity <= 0) {
-      return 0;
-    }
-
-    return ((float)remaining * 100.0f)/(float)capacity; 
+    return DFSUtil.getPercentRemaining(remaining, capacity);
   }
 
   /** The time when this information was accurate. */
@@ -161,6 +165,11 @@ public class DatanodeInfo extends Datano
     this.remaining = remaining; 
   }
 
+  /** Sets block pool used space */
+  public void setBlockPoolUsed(long bpUsed) { 
+    this.blockPoolUsed = bpUsed; 
+  }
+
   /** Sets time when this information was accurate. */
   public void setLastUpdate(long lastUpdate) { 
     this.lastUpdate = lastUpdate; 
@@ -342,6 +351,7 @@ public class DatanodeInfo extends Datano
     out.writeLong(capacity);
     out.writeLong(dfsUsed);
     out.writeLong(remaining);
+    out.writeLong(blockPoolUsed);
     out.writeLong(lastUpdate);
     out.writeInt(xceiverCount);
     Text.writeString(out, location);
@@ -359,6 +369,7 @@ public class DatanodeInfo extends Datano
     this.capacity = in.readLong();
     this.dfsUsed = in.readLong();
     this.remaining = in.readLong();
+    this.blockPoolUsed = in.readLong();
     this.lastUpdate = in.readLong();
     this.xceiverCount = in.readInt();
     this.location = Text.readString(in);

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Thu May  5 05:40:07 2011
@@ -88,14 +88,14 @@ public interface FSConstants {
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -34;
+  public static final int LAYOUT_VERSION = -36;
   // Current version: 
-  // -31, -32 and -33 are reserved for 0.20.203, 0.20.204 and 0.22.
-  // -34: persistent transaction IDs
+  // -35: Adding support for block pools and multiple namenodes
+  // -36: persistent transaction IDs
 
   // Record of version numbers for specific changes:
   // Version where the edits log and image stored txn ID information
-  public static final int FIRST_STORED_TXIDS_VERSION = -34;
+  public static final int FIRST_STORED_TXIDS_VERSION = -36;
   // Version where the edits log and image file names are based on txn IDs
-  public static final int FIRST_TXNID_BASED_LAYOUT_VERSION = -35;
+  public static final int FIRST_TXNID_BASED_LAYOUT_VERSION = -37;
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Thu May  5 05:40:07 2011
@@ -42,7 +42,7 @@ public class LocatedBlock implements Wri
        });
   }
 
-  private Block b;
+  private ExtendedBlock b;
   private long offset;  // offset of the first byte of the block in the file
   private DatanodeInfo[] locs;
   // corrupt flag is true if all of the replicas of a block are corrupt.
@@ -51,27 +51,23 @@ public class LocatedBlock implements Wri
   private boolean corrupt;
   private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>();
 
-  /**
-   */
   public LocatedBlock() {
-    this(new Block(), new DatanodeInfo[0], 0L, false);
+    this(new ExtendedBlock(), new DatanodeInfo[0], 0L, false);
   }
 
-  /**
-   */
-  public LocatedBlock(Block b, DatanodeInfo[] locs) {
+  public LocatedBlock(String bpid, Block b, DatanodeInfo[] locs) {
+    this(new ExtendedBlock(bpid, b), locs, -1, false); // startOffset is unknown
+  }
+
+  public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {
     this(b, locs, -1, false); // startOffset is unknown
   }
 
-  /**
-   */
-  public LocatedBlock(Block b, DatanodeInfo[] locs, long startOffset) {
+  public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset) {
     this(b, locs, startOffset, false);
   }
 
-  /**
-   */
-  public LocatedBlock(Block b, DatanodeInfo[] locs, long startOffset, 
+  public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset, 
                       boolean corrupt) {
     this.b = b;
     this.offset = startOffset;
@@ -93,7 +89,7 @@ public class LocatedBlock implements Wri
 
   /**
    */
-  public Block getBlock() {
+  public ExtendedBlock getBlock() {
     return b;
   }
 
@@ -141,7 +137,7 @@ public class LocatedBlock implements Wri
     blockToken.readFields(in);
     this.corrupt = in.readBoolean();
     offset = in.readLong();
-    this.b = new Block();
+    this.b = new ExtendedBlock();
     b.readFields(in);
     int count = in.readInt();
     this.locs = new DatanodeInfo[count];

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java Thu May  5 05:40:07 2011
@@ -37,19 +37,21 @@ public class BlockTokenIdentifier extend
   private long expiryDate;
   private int keyId;
   private String userId;
+  private String blockPoolId;
   private long blockId;
   private EnumSet<AccessMode> modes;
 
   private byte [] cache;
   
   public BlockTokenIdentifier() {
-    this(null, 0, EnumSet.noneOf(AccessMode.class));
+    this(null, null, 0, EnumSet.noneOf(AccessMode.class));
   }
 
-  public BlockTokenIdentifier(String userId, long blockId,
+  public BlockTokenIdentifier(String userId, String bpid, long blockId,
       EnumSet<AccessMode> modes) {
     this.cache = null;
     this.userId = userId;
+    this.blockPoolId = bpid;
     this.blockId = blockId;
     this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
   }
@@ -62,7 +64,8 @@ public class BlockTokenIdentifier extend
   @Override
   public UserGroupInformation getUser() {
     if (userId == null || "".equals(userId)) {
-      return UserGroupInformation.createRemoteUser(Long.toString(blockId));
+      String user = blockPoolId + ":" + Long.toString(blockId);
+      return UserGroupInformation.createRemoteUser(user);
     }
     return UserGroupInformation.createRemoteUser(userId);
   }
@@ -89,6 +92,10 @@ public class BlockTokenIdentifier extend
     return userId;
   }
 
+  public String getBlockPoolId() {
+    return blockPoolId;
+  }
+
   public long getBlockId() {
     return blockId;
   }
@@ -101,6 +108,7 @@ public class BlockTokenIdentifier extend
   public String toString() {
     return "block_token_identifier (expiryDate=" + this.getExpiryDate()
         + ", keyId=" + this.getKeyId() + ", userId=" + this.getUserId()
+        + ", blockPoolId=" + this.getBlockPoolId()
         + ", blockId=" + this.getBlockId() + ", access modes="
         + this.getAccessModes() + ")";
   }
@@ -117,7 +125,9 @@ public class BlockTokenIdentifier extend
     if (obj instanceof BlockTokenIdentifier) {
       BlockTokenIdentifier that = (BlockTokenIdentifier) obj;
       return this.expiryDate == that.expiryDate && this.keyId == that.keyId
-          && isEqual(this.userId, that.userId) && this.blockId == that.blockId
+          && isEqual(this.userId, that.userId) 
+          && isEqual(this.blockPoolId, that.blockPoolId)
+          && this.blockId == that.blockId
           && isEqual(this.modes, that.modes);
     }
     return false;
@@ -126,7 +136,8 @@ public class BlockTokenIdentifier extend
   /** {@inheritDoc} */
   public int hashCode() {
     return (int) expiryDate ^ keyId ^ (int) blockId ^ modes.hashCode()
-        ^ (userId == null ? 0 : userId.hashCode());
+        ^ (userId == null ? 0 : userId.hashCode())
+        ^ (blockPoolId == null ? 0 : blockPoolId.hashCode());
   }
 
   public void readFields(DataInput in) throws IOException {
@@ -134,6 +145,7 @@ public class BlockTokenIdentifier extend
     expiryDate = WritableUtils.readVLong(in);
     keyId = WritableUtils.readVInt(in);
     userId = WritableUtils.readString(in);
+    blockPoolId = WritableUtils.readString(in);
     blockId = WritableUtils.readVLong(in);
     int length = WritableUtils.readVInt(in);
     for (int i = 0; i < length; i++) {
@@ -145,6 +157,7 @@ public class BlockTokenIdentifier extend
     WritableUtils.writeVLong(out, expiryDate);
     WritableUtils.writeVInt(out, keyId);
     WritableUtils.writeString(out, userId);
+    WritableUtils.writeString(out, blockPoolId);
     WritableUtils.writeVLong(out, blockId);
     WritableUtils.writeVInt(out, modes.size());
     for (AccessMode aMode : modes) {

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java Thu May  5 05:40:07 2011
@@ -31,7 +31,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
@@ -174,7 +174,7 @@ public class BlockTokenSecretManager ext
   }
 
   /** Generate an block token for current user */
-  public Token<BlockTokenIdentifier> generateToken(Block block,
+  public Token<BlockTokenIdentifier> generateToken(ExtendedBlock block,
       EnumSet<AccessMode> modes) throws IOException {
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     String userID = (ugi == null ? null : ugi.getShortUserName());
@@ -182,10 +182,10 @@ public class BlockTokenSecretManager ext
   }
 
   /** Generate a block token for a specified user */
-  public Token<BlockTokenIdentifier> generateToken(String userId, Block block,
-      EnumSet<AccessMode> modes) throws IOException {
+  public Token<BlockTokenIdentifier> generateToken(String userId,
+      ExtendedBlock block, EnumSet<AccessMode> modes) throws IOException {
     BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
-        .getBlockId(), modes);
+        .getBlockPoolId(), block.getBlockId(), modes);
     return new Token<BlockTokenIdentifier>(id, this);
   }
 
@@ -194,8 +194,8 @@ public class BlockTokenSecretManager ext
    * method doesn't check if token password is correct. It should be used only
    * when token password has already been verified (e.g., in the RPC layer).
    */
-  public void checkAccess(BlockTokenIdentifier id, String userId, Block block,
-      AccessMode mode) throws InvalidToken {
+  public void checkAccess(BlockTokenIdentifier id, String userId,
+      ExtendedBlock block, AccessMode mode) throws InvalidToken {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Checking access for user=" + userId + ", block=" + block
           + ", access mode=" + mode + " using " + id.toString());
@@ -204,6 +204,10 @@ public class BlockTokenSecretManager ext
       throw new InvalidToken("Block token with " + id.toString()
           + " doesn't belong to user " + userId);
     }
+    if (!id.getBlockPoolId().equals(block.getBlockPoolId())) {
+      throw new InvalidToken("Block token with " + id.toString()
+          + " doesn't apply to block " + block);
+    }
     if (id.getBlockId() != block.getBlockId()) {
       throw new InvalidToken("Block token with " + id.toString()
           + " doesn't apply to block " + block);
@@ -220,7 +224,7 @@ public class BlockTokenSecretManager ext
 
   /** Check if access should be allowed. userID is not checked if null */
   public void checkAccess(Token<BlockTokenIdentifier> token, String userId,
-      Block block, AccessMode mode) throws InvalidToken {
+      ExtendedBlock block, AccessMode mode) throws InvalidToken {
     BlockTokenIdentifier id = new BlockTokenIdentifier();
     try {
       id.readFields(new DataInputStream(new ByteArrayInputStream(token



Mime
View raw message