hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1374355 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocolPB/...
Date Fri, 17 Aug 2012 16:52:08 GMT
Author: atm
Date: Fri Aug 17 16:52:07 2012
New Revision: 1374355

URL: http://svn.apache.org/viewvc?rev=1374355&view=rev
Log:
HDFS-3672. Expose disk-location information for blocks to enable better scheduling. Contributed by Andrew Wang.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Aug 17 16:52:07 2012
@@ -402,6 +402,9 @@ Branch-2 ( Unreleased changes )
     HDFS-2963. Console Output is confusing while executing metasave
     (dfsadmin command). (Andrew Wang via eli)
 
+    HDFS-3672. Expose disk-location information for blocks to enable better
+    scheduling. (Andrew Wang via atm)
+
   OPTIMIZATIONS
 
     HDFS-2982. Startup performance suffers when there are many edit log

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java?rev=1374355&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java Fri Aug 17 16:52:07 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Wrapper for {@link BlockLocation} that also adds {@link VolumeId} volume
+ * location information for each replica.
+ */
+@InterfaceStability.Unstable
+@InterfaceAudience.Public
+public class BlockStorageLocation extends BlockLocation {
+
+  private final VolumeId[] volumeIds;
+
+  public BlockStorageLocation(BlockLocation loc, VolumeId[] volumeIds)
+      throws IOException {
+    // Initialize with data from passed in BlockLocation
+    super(loc.getNames(), loc.getHosts(), loc.getTopologyPaths(), loc
+        .getOffset(), loc.getLength(), loc.isCorrupt());
+    this.volumeIds = volumeIds;
+  }
+
+  /**
+   * Gets the list of {@link VolumeId} corresponding to the block's replicas.
+   * 
+   * @return volumeIds list of VolumeId for the block's replicas
+   */
+  public VolumeId[] getVolumeIds() {
+    return volumeIds;
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java?rev=1374355&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java Fri Aug 17 16:52:07 2012
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+
+/**
+ * Wrapper for {@link BlockLocation} that also includes a {@link LocatedBlock},
+ * allowing more detailed queries to the datanode about a block.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class HdfsBlockLocation extends BlockLocation {
+
+  private final LocatedBlock block;
+  
+  public HdfsBlockLocation(BlockLocation loc, LocatedBlock block) 
+      throws IOException {
+    // Initialize with data from passed in BlockLocation
+    super(loc.getNames(), loc.getHosts(), loc.getTopologyPaths(), 
+        loc.getOffset(), loc.getLength(), loc.isCorrupt());
+    this.block = block;
+  }
+  
+  public LocatedBlock getLocatedBlock() {
+    return block;
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java?rev=1374355&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java Fri Aug 17 16:52:07 2012
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * HDFS-specific volume identifier which implements {@link VolumeId}. Can be
+ * used to differentiate between the data directories on a single datanode. This
+ * identifier is only unique on a per-datanode basis.
+ */
+@InterfaceStability.Unstable
+@InterfaceAudience.Public
+public class HdfsVolumeId implements VolumeId {
+
+  private final byte id;
+  private final boolean isValid;
+
+  public HdfsVolumeId(byte id, boolean isValid) {
+    this.id = id;
+    this.isValid = isValid;
+  }
+
+  @Override
+  public boolean isValid() {
+    return isValid;
+  }
+
+  @Override
+  public int compareTo(VolumeId arg0) {
+    return hashCode() - arg0.hashCode();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(id).toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || obj.getClass() != getClass()) {
+      return false;
+    }
+    if (obj == this) {
+      return true;
+    }
+
+    HdfsVolumeId that = (HdfsVolumeId) obj;
+    return new EqualsBuilder().append(this.id, that.id).isEquals();
+  }
+
+  @Override
+  public String toString() {
+    return Byte.toString(id);
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java?rev=1374355&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java Fri Aug 17 16:52:07 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Opaque interface that identifies a disk location. Subclasses
+ * should implement {@link Comparable} and override both equals and hashCode.
+ */
+@InterfaceStability.Unstable
+@InterfaceAudience.Public
+public interface VolumeId extends Comparable<VolumeId> {
+
+  /**
+   * Indicates if the disk identifier is valid. Invalid identifiers indicate
+   * that the block was not present, or the location could otherwise not be
+   * determined.
+   * 
+   * @return true if the disk identifier is valid
+   */
+  public boolean isValid();
+
+  @Override
+  abstract public int compareTo(VolumeId arg0);
+
+  @Override
+  abstract public int hashCode();
+
+  @Override
+  abstract public boolean equals(Object obj);
+
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java?rev=1374355&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java Fri Aug 17 16:52:07 2012
@@ -0,0 +1,337 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.BlockStorageLocation;
+import org.apache.hadoop.fs.HdfsVolumeId;
+import org.apache.hadoop.fs.VolumeId;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.token.Token;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class BlockStorageLocationUtil {
+  
+  private static final Log LOG = LogFactory
+      .getLog(BlockStorageLocationUtil.class);
+  
+  /**
+   * Create a list of {@link VolumeBlockLocationCallable} corresponding to a set
+   * of datanodes and blocks.
+   * 
+   * @param datanodeBlocks
+   *          Map of datanodes to block replicas at each datanode
+   * @return callables Used to query each datanode for location information on
+   *         the block replicas at the datanode
+   */
+  private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallables(
+      Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
+      int timeout, boolean connectToDnViaHostname) {
+    // Construct the callables, one per datanode
+    List<VolumeBlockLocationCallable> callables = 
+        new ArrayList<VolumeBlockLocationCallable>();
+    for (Map.Entry<DatanodeInfo, List<LocatedBlock>> entry : datanodeBlocks
+        .entrySet()) {
+      // Construct RPC parameters
+      DatanodeInfo datanode = entry.getKey();
+      List<LocatedBlock> locatedBlocks = entry.getValue();
+      List<ExtendedBlock> extendedBlocks = 
+          new ArrayList<ExtendedBlock>(locatedBlocks.size());
+      List<Token<BlockTokenIdentifier>> dnTokens = 
+          new ArrayList<Token<BlockTokenIdentifier>>(
+          locatedBlocks.size());
+      for (LocatedBlock b : locatedBlocks) {
+        extendedBlocks.add(b.getBlock());
+        dnTokens.add(b.getBlockToken());
+      }
+      VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable(
+          conf, datanode, extendedBlocks, dnTokens, timeout, 
+          connectToDnViaHostname);
+      callables.add(callable);
+    }
+    return callables;
+  }
+  
+  /**
+   * Queries datanodes for the blocks specified in <code>datanodeBlocks</code>,
+   * making one RPC to each datanode. These RPCs are made in parallel using a
+   * threadpool.
+   * 
+   * @param datanodeBlocks
+   *          Map of datanodes to the blocks present on the DN
+   * @return metadatas List of block metadata for each datanode, specifying
+   *         volume locations for each block
+   * @throws InvalidBlockTokenException
+   *           if client does not have read access on a requested block
+   */
+  static List<HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata(
+      Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
+      int poolsize, int timeout, boolean connectToDnViaHostname)
+      throws InvalidBlockTokenException {
+
+    List<VolumeBlockLocationCallable> callables = 
+        createVolumeBlockLocationCallables(conf, datanodeBlocks, timeout, 
+            connectToDnViaHostname);
+    
+    // Use a thread pool to execute the Callables in parallel
+    List<Future<HdfsBlocksMetadata>> futures = 
+        new ArrayList<Future<HdfsBlocksMetadata>>();
+    ExecutorService executor = new ScheduledThreadPoolExecutor(poolsize);
+    try {
+      futures = executor.invokeAll(callables, timeout, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      // Swallow the exception here, because we can return partial results
+    }
+    executor.shutdown();
+    
+    // Initialize metadatas list with nulls
+    // This is used to later indicate if we didn't get a response from a DN
+    List<HdfsBlocksMetadata> metadatas = new ArrayList<HdfsBlocksMetadata>();
+    for (int i = 0; i < futures.size(); i++) {
+      metadatas.add(null);
+    }
+    // Fill in metadatas with results from DN RPCs, where possible
+    for (int i = 0; i < futures.size(); i++) {
+      Future<HdfsBlocksMetadata> future = futures.get(i);
+      try {
+        HdfsBlocksMetadata metadata = future.get();
+        metadatas.set(i, metadata);
+      } catch (ExecutionException e) {
+        VolumeBlockLocationCallable callable = callables.get(i);
+        DatanodeInfo datanode = callable.getDatanodeInfo();
+        Throwable t = e.getCause();
+        if (t instanceof InvalidBlockTokenException) {
+          LOG.warn("Invalid access token when trying to retrieve "
+              + "information from datanode " + datanode.getIpcAddr(false));
+          throw (InvalidBlockTokenException) t;
+        }
+        else if (t instanceof UnsupportedOperationException) {
+          LOG.info("Datanode " + datanode.getIpcAddr(false) + " does not support"
+              + " required #getHdfsBlocksMetadata() API");
+          throw (UnsupportedOperationException) t;
+        } else {
+          LOG.info("Failed to connect to datanode " +
+              datanode.getIpcAddr(false));
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Could not fetch information from datanode", t);
+        }
+      } catch (InterruptedException e) {
+        // Shouldn't happen, because invokeAll waits for all Futures to be ready
+        LOG.info("Interrupted while fetching HdfsBlocksMetadata");
+      }
+    }
+    
+    return metadatas;
+  }
+  
+  /**
+   * Group the per-replica {@link VolumeId} info returned from
+   * {@link DFSClient#queryDatanodesForHdfsBlocksMetadata(Map)} to be associated
+   * with the corresponding {@link LocatedBlock}.
+   * 
+   * @param blocks
+   *          Original LocatedBlock array
+   * @param datanodeBlocks
+   *          Mapping from datanodes to the list of replicas on each datanode
+   * @param metadatas
+   *          VolumeId information for the replicas on each datanode
+   * @return blockVolumeIds per-replica VolumeId information associated with the
+   *         parent LocatedBlock
+   */
+  static Map<LocatedBlock, List<VolumeId>> associateVolumeIdsWithBlocks(
+      List<LocatedBlock> blocks, Map<DatanodeInfo, 
+      List<LocatedBlock>> datanodeBlocks, List<HdfsBlocksMetadata> metadatas) {
+    
+    // Initialize mapping of ExtendedBlock to LocatedBlock. 
+    // Used to associate results from DN RPCs to the parent LocatedBlock
+    Map<ExtendedBlock, LocatedBlock> extBlockToLocBlock = 
+        new HashMap<ExtendedBlock, LocatedBlock>();
+    for (LocatedBlock b : blocks) {
+      extBlockToLocBlock.put(b.getBlock(), b);
+    }
+    
+    // Initialize the mapping of blocks -> list of VolumeIds, one per replica
+    // This is filled out with real values from the DN RPCs
+    Map<LocatedBlock, List<VolumeId>> blockVolumeIds = 
+        new HashMap<LocatedBlock, List<VolumeId>>();
+    for (LocatedBlock b : blocks) {
+      ArrayList<VolumeId> l = new ArrayList<VolumeId>(b.getLocations().length);
+      // Start off all IDs as invalid, fill it in later with results from RPCs
+      for (int i = 0; i < b.getLocations().length; i++) {
+        l.add(new HdfsVolumeId((byte)-1, false));
+      }
+      blockVolumeIds.put(b, l);
+    }
+    
+    // Iterate through the list of metadatas (one per datanode). 
+    // For each metadata, if it's valid, insert its volume location information 
+    // into the Map returned to the caller 
+    Iterator<HdfsBlocksMetadata> metadatasIter = metadatas.iterator();
+    Iterator<DatanodeInfo> datanodeIter = datanodeBlocks.keySet().iterator();
+    while (metadatasIter.hasNext()) {
+      HdfsBlocksMetadata metadata = metadatasIter.next();
+      DatanodeInfo datanode = datanodeIter.next();
+      // Check if metadata is valid
+      if (metadata == null) {
+        continue;
+      }
+      ExtendedBlock[] metaBlocks = metadata.getBlocks();
+      List<byte[]> metaVolumeIds = metadata.getVolumeIds();
+      List<Integer> metaVolumeIndexes = metadata.getVolumeIndexes();
+      // Add VolumeId for each replica in the HdfsBlocksMetadata
+      for (int j = 0; j < metaBlocks.length; j++) {
+        int volumeIndex = metaVolumeIndexes.get(j);
+        ExtendedBlock extBlock = metaBlocks[j];
+        // Skip if block wasn't found, or not a valid index into metaVolumeIds
+        // Also skip if the DN responded with a block we didn't ask for
+        if (volumeIndex == Integer.MAX_VALUE
+            || volumeIndex >= metaVolumeIds.size()
+            || !extBlockToLocBlock.containsKey(extBlock)) {
+          continue;
+        }
+        // Get the VolumeId by indexing into the list of VolumeIds
+        // provided by the datanode
+        HdfsVolumeId id = new HdfsVolumeId(metaVolumeIds.get(volumeIndex)[0],
+            true);
+        // Find out which index we are in the LocatedBlock's replicas
+        LocatedBlock locBlock = extBlockToLocBlock.get(extBlock);
+        DatanodeInfo[] dnInfos = locBlock.getLocations();
+        int index = -1;
+        for (int k = 0; k < dnInfos.length; k++) {
+          if (dnInfos[k].equals(datanode)) {
+            index = k;
+            break;
+          }
+        }
+        if (index < 0) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Datanode responded with a block volume id we did" +
+                " not request, omitting.");
+          }
+          continue;
+        }
+        // Place VolumeId at the same index as the DN's index in the list of
+        // replicas
+        List<VolumeId> VolumeIds = blockVolumeIds.get(locBlock);
+        VolumeIds.set(index, id);
+      }
+    }
+    return blockVolumeIds;
+  }
+
+  /**
+   * Helper method to combine a list of {@link LocatedBlock} with associated
+   * {@link VolumeId} information to form a list of {@link BlockStorageLocation}
+   * .
+   */
+  static BlockStorageLocation[] convertToVolumeBlockLocations(
+      List<LocatedBlock> blocks, 
+      Map<LocatedBlock, List<VolumeId>> blockVolumeIds) throws IOException {
+    // Construct the final return value of VolumeBlockLocation[]
+    BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks);
+    List<BlockStorageLocation> volumeBlockLocs = 
+        new ArrayList<BlockStorageLocation>(locations.length);
+    for (int i = 0; i < locations.length; i++) {
+      LocatedBlock locBlock = blocks.get(i);
+      List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
+      BlockStorageLocation bsLoc = new BlockStorageLocation(locations[i], 
+          volumeIds.toArray(new VolumeId[0]));
+      volumeBlockLocs.add(bsLoc);
+    }
+    return volumeBlockLocs.toArray(new BlockStorageLocation[] {});
+  }
+  
+  /**
+   * Callable that sets up an RPC proxy to a datanode and queries it for
+   * volume location information for a list of ExtendedBlocks. 
+   */
+  private static class VolumeBlockLocationCallable implements 
+    Callable<HdfsBlocksMetadata> {
+    
+    private Configuration configuration;
+    private int timeout;
+    private DatanodeInfo datanode;
+    private List<ExtendedBlock> extendedBlocks;
+    private List<Token<BlockTokenIdentifier>> dnTokens;
+    private boolean connectToDnViaHostname;
+    
+    VolumeBlockLocationCallable(Configuration configuration,
+        DatanodeInfo datanode, List<ExtendedBlock> extendedBlocks,
+        List<Token<BlockTokenIdentifier>> dnTokens, int timeout, 
+        boolean connectToDnViaHostname) {
+      this.configuration = configuration;
+      this.timeout = timeout;
+      this.datanode = datanode;
+      this.extendedBlocks = extendedBlocks;
+      this.dnTokens = dnTokens;
+      this.connectToDnViaHostname = connectToDnViaHostname;
+    }
+    
+    public DatanodeInfo getDatanodeInfo() {
+      return datanode;
+    }
+
+    @Override
+    public HdfsBlocksMetadata call() throws Exception {
+      HdfsBlocksMetadata metadata = null;
+      // Create the RPC proxy and make the RPC
+      ClientDatanodeProtocol cdp = null;
+      try {
+        cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
+            timeout, connectToDnViaHostname);
+        metadata = cdp.getHdfsBlocksMetadata(extendedBlocks, dnTokens);
+      } catch (IOException e) {
+        // Bubble this up to the caller, handle with the Future
+        throw e;
+      } finally {
+        if (cdp != null) {
+          RPC.stopProxy(cdp);
+        }
+      }
+      return metadata;
+    }
+  }
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Aug 17 16:52:07 2012
@@ -45,8 +45,6 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
@@ -69,6 +67,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -80,6 +79,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.BlockStorageLocation;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
@@ -87,12 +87,14 @@ import org.apache.hadoop.fs.FileAlreadyE
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.HdfsBlockLocation;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
@@ -102,6 +104,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
@@ -120,8 +123,9 @@ import org.apache.hadoop.hdfs.protocol.d
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -216,6 +220,9 @@ public class DFSClient implements java.i
     final FsPermission uMask;
     final boolean useLegacyBlockReader;
     final boolean connectToDnViaHostname;
+    final boolean getHdfsBlocksMetadataEnabled;
+    final int getFileBlockStorageLocationsNumThreads;
+    final int getFileBlockStorageLocationsTimeout;
 
     Conf(Configuration conf) {
       maxFailoverAttempts = conf.getInt(
@@ -268,6 +275,15 @@ public class DFSClient implements java.i
           DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
       connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
           DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+      getHdfsBlocksMetadataEnabled = conf.getBoolean(
+          DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, 
+          DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+      getFileBlockStorageLocationsNumThreads = conf.getInt(
+          DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS,
+          DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT);
+      getFileBlockStorageLocationsTimeout = conf.getInt(
+          DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT,
+          DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT);
     }
 
     private DataChecksum.Type getChecksumType(Configuration conf) {
@@ -942,7 +958,81 @@ public class DFSClient implements java.i
   public BlockLocation[] getBlockLocations(String src, long start, 
     long length) throws IOException, UnresolvedLinkException {
     LocatedBlocks blocks = getLocatedBlocks(src, start, length);
-    return DFSUtil.locatedBlocks2Locations(blocks);
+    BlockLocation[] locations =  DFSUtil.locatedBlocks2Locations(blocks);
+    HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length];
+    for (int i = 0; i < locations.length; i++) {
+      hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i));
+    }
+    return hdfsLocations;
+  }
+  
+  /**
+   * Get block location information about a list of {@link HdfsBlockLocation}.
+   * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to
+   * get {@link BlockStorageLocation}s for blocks returned by
+   * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)}
+   * .
+   * 
+   * This is done by making a round of RPCs to the associated datanodes, asking
+   * the volume of each block replica. The returned array of
+   * {@link BlockStorageLocation} expose this information as a
+   * {@link VolumeId}.
+   * 
+   * @param blockLocations
+   *          target blocks on which to query volume location information
+   * @return volumeBlockLocations original block array augmented with additional
+   *         volume location information for each replica.
+   */
+  public BlockStorageLocation[] getBlockStorageLocations(
+      List<BlockLocation> blockLocations) throws IOException,
+      UnsupportedOperationException, InvalidBlockTokenException {
+    if (!getConf().getHdfsBlocksMetadataEnabled) {
+      throw new UnsupportedOperationException("Datanode-side support for " +
+          "getVolumeBlockLocations() must also be enabled in the client " +
+          "configuration.");
+    }
+    // Downcast blockLocations and fetch out required LocatedBlock(s)
+    List<LocatedBlock> blocks = new ArrayList<LocatedBlock>();
+    for (BlockLocation loc : blockLocations) {
+      if (!(loc instanceof HdfsBlockLocation)) {
+        throw new ClassCastException("DFSClient#getVolumeBlockLocations " +
+            "expected to be passed HdfsBlockLocations");
+      }
+      HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc;
+      blocks.add(hdfsLoc.getLocatedBlock());
+    }
+    
+    // Re-group the LocatedBlocks to be grouped by datanodes, with the values
+    // a list of the LocatedBlocks on the datanode.
+    Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = 
+        new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>();
+    for (LocatedBlock b : blocks) {
+      for (DatanodeInfo info : b.getLocations()) {
+        if (!datanodeBlocks.containsKey(info)) {
+          datanodeBlocks.put(info, new ArrayList<LocatedBlock>());
+        }
+        List<LocatedBlock> l = datanodeBlocks.get(info);
+        l.add(b);
+      }
+    }
+        
+    // Make RPCs to the datanodes to get volume locations for its replicas
+    List<HdfsBlocksMetadata> metadatas = BlockStorageLocationUtil
+        .queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
+            getConf().getFileBlockStorageLocationsNumThreads,
+            getConf().getFileBlockStorageLocationsTimeout,
+            getConf().connectToDnViaHostname);
+    
+    // Regroup the returned VolumeId metadata to again be grouped by
+    // LocatedBlock rather than by datanode
+    Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil
+        .associateVolumeIdsWithBlocks(blocks, datanodeBlocks, metadatas);
+    
+    // Combine original BlockLocations with new VolumeId information
+    BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil
+        .convertToVolumeBlockLocations(blocks, blockVolumeIds);
+
+    return volumeBlockLocations;
   }
   
   public DFSInputStream open(String src) 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Aug 17 16:52:07 2012
@@ -54,6 +54,12 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
   public static final String  DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";
   public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false;
+  public static final String  DFS_HDFS_BLOCKS_METADATA_ENABLED = "dfs.datanode.hdfs-blocks-metadata.enabled";
+  public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
+  public static final String  DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads";
+  public static final int     DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT = 10;
+  public static final String  DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT = "dfs.client.file-block-storage-locations.timeout";
+  public static final int     DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT = 60;
 
   // HA related configuration
   public static final String  DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = "dfs.client.failover.proxy.provider";
@@ -245,7 +251,7 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_DATANODE_DU_RESERVED_KEY = "dfs.datanode.du.reserved";
   public static final long    DFS_DATANODE_DU_RESERVED_DEFAULT = 0;
   public static final String  DFS_DATANODE_HANDLER_COUNT_KEY = "dfs.datanode.handler.count";
-  public static final int     DFS_DATANODE_HANDLER_COUNT_DEFAULT = 3;
+  public static final int     DFS_DATANODE_HANDLER_COUNT_DEFAULT = 10;
   public static final String  DFS_DATANODE_HTTP_ADDRESS_KEY = "dfs.datanode.http.address";
   public static final int     DFS_DATANODE_HTTP_DEFAULT_PORT = 50075;
   public static final String  DFS_DATANODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_HTTP_DEFAULT_PORT;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Fri Aug 17 16:52:07 2012
@@ -282,13 +282,25 @@ public class DFSUtil {
     if (blocks == null) {
       return new BlockLocation[0];
     }
-    int nrBlocks = blocks.locatedBlockCount();
+    return locatedBlocks2Locations(blocks.getLocatedBlocks());
+  }
+  
+  /**
+   * Convert a List<LocatedBlock> to BlockLocation[]
+   * @param blocks A List<LocatedBlock> to be converted
+   * @return converted array of BlockLocation
+   */
+  public static BlockLocation[] locatedBlocks2Locations(List<LocatedBlock> blocks) {
+    if (blocks == null) {
+      return new BlockLocation[0];
+    }
+    int nrBlocks = blocks.size();
     BlockLocation[] blkLocations = new BlockLocation[nrBlocks];
     if (nrBlocks == 0) {
       return blkLocations;
     }
     int idx = 0;
-    for (LocatedBlock blk : blocks.getLocatedBlocks()) {
+    for (LocatedBlock blk : blocks) {
       assert idx < nrBlocks : "Incorrect index";
       DatanodeInfo[] locations = blk.getLocations();
       String[] hosts = new String[locations.length];

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Aug 17 16:52:07 2012
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -31,6 +32,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.BlockStorageLocation;
+import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -55,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -188,6 +192,36 @@ public class DistributedFileSystem exten
 
   }
 
+  /**
+   * Used to query storage location information for a list of blocks. This list
+   * of blocks is normally constructed via a series of calls to
+   * {@link DistributedFileSystem#getFileBlockLocations(Path, long, long)} to
+   * get the blocks for ranges of a file.
+   * 
+   * The returned array of {@link BlockStorageLocation} augments
+   * {@link BlockLocation} with a {@link VolumeId} per block replica. The
+   * VolumeId specifies the volume on the datanode on which the replica resides.
+   * The VolumeId has to be checked via {@link VolumeId#isValid()} before being
+   * used because volume information can be unavailable if the corresponding
+   * datanode is down or if the requested block is not found.
+   * 
+   * This API is unstable, and datanode-side support is disabled by default. It
+   * can be enabled by setting "dfs.datanode.hdfs-blocks-metadata.enabled" to
+   * true.
+   * 
+   * @param blocks
+   *          List of target BlockLocations to query volume location information
+   * @return volumeBlockLocations Augmented array of
+   *         {@link BlockStorageLocation}s containing additional volume location
+   *         information for each replica of each block.
+   */
+  @InterfaceStability.Unstable
+  public BlockStorageLocation[] getFileBlockStorageLocations(
+      List<BlockLocation> blocks) throws IOException, 
+      UnsupportedOperationException, InvalidBlockTokenException {
+    return dfs.getBlockStorageLocations(blocks);
+  }
+
   @Override
   public void setVerifyChecksum(boolean verifyChecksum) {
     this.verifyChecksum = verifyChecksum;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Fri Aug 17 16:52:07 2012
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.protocol;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -106,4 +107,21 @@ public interface ClientDatanodeProtocol 
    */
   BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
       Token<BlockTokenIdentifier> token) throws IOException;
+  
+  /**
+   * Retrieves volume location information about a list of blocks on a datanode.
+   * This is in the form of an opaque {@link VolumeId} for each configured
+   * data directory, which is not guaranteed to be the same across DN restarts.
+   * 
+   * @param blocks
+   *          list of blocks on the local datanode
+   * @param tokens
+   *          block access tokens corresponding to the requested blocks
+   * @return an HdfsBlocksMetadata that associates {@link ExtendedBlock}s with
+   *         data directories
+   * @throws IOException
+   *           if datanode is unreachable, or replica is not found on datanode
+   */
+  HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
+      List<Token<BlockTokenIdentifier>> tokens) throws IOException; 
 }

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java?rev=1374355&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java Fri Aug 17 16:52:07 2012
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Augments an array of blocks on a datanode with additional information about
+ * where the block is stored.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class HdfsBlocksMetadata {
+  
+  /**
+   * List of blocks
+   */
+  private final ExtendedBlock[] blocks;
+  
+  /**
+   * List of volumes
+   */
+  private final List<byte[]> volumeIds;
+  
+  /**
+   * List of indexes into <code>volumeIds</code>, one per block in
+   * <code>blocks</code>. A value of Integer.MAX_VALUE indicates that the
+   * block was not found.
+   */
+  private final List<Integer> volumeIndexes;
+
+  /**
+   * Constructs HdfsBlocksMetadata.
+   * 
+   * @param blocks
+   *          List of blocks described
+   * @param volumeIds
+   *          List of potential volume identifiers, specifying volumes where 
+   *          blocks may be stored
+   * @param volumeIndexes
+   *          Indexes into the list of volume identifiers, one per block
+   */
+  public HdfsBlocksMetadata(ExtendedBlock[] blocks, List<byte[]> volumeIds, 
+      List<Integer> volumeIndexes) {
+    this.blocks = blocks;
+    this.volumeIds = volumeIds;
+    this.volumeIndexes = volumeIndexes;
+  }
+
+  /**
+   * Get the array of blocks.
+   * 
+   * @return array of blocks
+   */
+  public ExtendedBlock[] getBlocks() {
+    return blocks;
+  }
+  
+  /**
+   * Get the list of volume identifiers in raw byte form.
+   * 
+   * @return list of ids
+   */
+  public List<byte[]> getVolumeIds() {
+    return volumeIds;
+  }
+
+  /**
+   * Get a list of indexes into the array of {@link VolumeId}s, one per block.
+   * 
+   * @return list of indexes
+   */
+  public List<Integer> getVolumeIndexes() {
+    return volumeIndexes;
+  }
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java Fri Aug 17 16:52:07 2012
@@ -18,19 +18,31 @@
 package org.apache.hadoop.hdfs.protocolPB;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto.Builder;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
 
+import com.google.protobuf.ByteString;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
@@ -106,4 +118,38 @@ public class ClientDatanodeProtocolServe
         .setLocalPath(resp.getBlockPath()).setLocalMetaPath(resp.getMetaPath())
         .build();
   }
+
+  @Override
+  public GetHdfsBlockLocationsResponseProto getHdfsBlockLocations(
+      RpcController controller, GetHdfsBlockLocationsRequestProto request)
+      throws ServiceException {
+    HdfsBlocksMetadata resp;
+    try {
+      // Construct the Lists to make the actual call
+      List<ExtendedBlock> blocks = 
+          new ArrayList<ExtendedBlock>(request.getBlocksCount());
+      for (ExtendedBlockProto b : request.getBlocksList()) {
+        blocks.add(PBHelper.convert(b));
+      }
+      List<Token<BlockTokenIdentifier>> tokens = 
+          new ArrayList<Token<BlockTokenIdentifier>>(request.getTokensCount());
+      for (BlockTokenIdentifierProto b : request.getTokensList()) {
+        tokens.add(PBHelper.convert(b));
+      }
+      // Call the real implementation
+      resp = impl.getHdfsBlocksMetadata(blocks, tokens);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    List<ByteString> volumeIdsByteStrings = 
+        new ArrayList<ByteString>(resp.getVolumeIds().size());
+    for (byte[] b : resp.getVolumeIds()) {
+      volumeIdsByteStrings.add(ByteString.copyFrom(b));
+    }
+    // Build and return the response
+    Builder builder = GetHdfsBlockLocationsResponseProto.newBuilder();
+    builder.addAllVolumeIds(volumeIdsByteStrings);
+    builder.addAllVolumeIndexes(resp.getVolumeIndexes());
+    return builder.build();
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java Fri Aug 17 16:52:07 2012
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.protocolP
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
 
 import javax.net.SocketFactory;
 
@@ -33,12 +35,17 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -50,6 +57,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 
+import com.google.protobuf.ByteString;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
@@ -207,4 +215,44 @@ public class ClientDatanodeProtocolTrans
   public Object getUnderlyingProxyObject() {
     return rpcProxy;
   }
-}
\ No newline at end of file
+
+  @Override
+  public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
+      List<Token<BlockTokenIdentifier>> tokens) throws IOException {
+    // Convert to proto objects
+    List<ExtendedBlockProto> blocksProtos = 
+        new ArrayList<ExtendedBlockProto>(blocks.size());
+    List<BlockTokenIdentifierProto> tokensProtos = 
+        new ArrayList<BlockTokenIdentifierProto>(tokens.size());
+    for (ExtendedBlock b : blocks) {
+      blocksProtos.add(PBHelper.convert(b));
+    }
+    for (Token<BlockTokenIdentifier> t : tokens) {
+      tokensProtos.add(PBHelper.convert(t));
+    }
+    // Build the request
+    GetHdfsBlockLocationsRequestProto request = 
+        GetHdfsBlockLocationsRequestProto.newBuilder()
+        .addAllBlocks(blocksProtos)
+        .addAllTokens(tokensProtos)
+        .build();
+    // Send the RPC
+    GetHdfsBlockLocationsResponseProto response;
+    try {
+      response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    // List of volumes in the response
+    List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList();
+    List<byte[]> volumeIds = new ArrayList<byte[]>(volumeIdsByteStrings.size());
+    for (ByteString bs : volumeIdsByteStrings) {
+      volumeIds.add(bs.toByteArray());
+    }
+    // Array of indexes into the list of volumes, one per block
+    List<Integer> volumeIndexes = response.getVolumeIndexesList();
+    // Parsed HdfsVolumeId values, one per block
+    return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), 
+        volumeIds, volumeIndexes);
+  }
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Aug 17 16:52:07 2012
@@ -96,6 +96,7 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
@@ -279,6 +280,7 @@ public class DataNode extends Configured
   private final String userWithLocalPathAccess;
   private boolean connectToDnViaHostname;
   ReadaheadPool readaheadPool;
+  private final boolean getHdfsBlockLocationsEnabled;
 
   /**
    * Create the DataNode given a configuration and an array of dataDirs.
@@ -303,6 +305,9 @@ public class DataNode extends Configured
     this.connectToDnViaHostname = conf.getBoolean(
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+    this.getHdfsBlockLocationsEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, 
+        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
     try {
       hostName = getHostName(conf);
       LOG.info("Configured hostname is " + hostName);
@@ -1033,6 +1038,25 @@ public class DataNode extends Configured
     metrics.incrBlocksGetLocalPathInfo();
     return info;
   }
+
+  @Override
+  public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
+      List<Token<BlockTokenIdentifier>> tokens) throws IOException, 
+      UnsupportedOperationException {
+    if (!getHdfsBlockLocationsEnabled) {
+      throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata "
+          + " is not enabled in datanode config");
+    }
+    if (blocks.size() != tokens.size()) {
+      throw new IOException("Differing number of blocks and tokens");
+    }
+    // Check access for each block
+    for (int i = 0; i < blocks.size(); i++) {
+      checkBlockToken(blocks.get(i), tokens.get(i), 
+          BlockTokenSecretManager.AccessMode.READ);
+    }
+    return data.getHdfsBlocksMetadata(blocks);
+  }
   
   private void checkBlockToken(ExtendedBlock block, Token<BlockTokenIdentifier> token,
       AccessMode accessMode) throws IOException {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Fri Aug 17 16:52:07 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
@@ -373,4 +374,16 @@ public interface FsDatasetSpi<V extends 
    */
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b
       ) throws IOException;
+
+  /**
+   * Get a {@link HdfsBlocksMetadata} corresponding to the list of blocks in 
+   * <code>blocks</code>.
+   * 
+   * @param blocks List of blocks for which to return metadata
+   * @return metadata Metadata for the list of blocks
+   * @throws IOException
+   */
+  public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
+      throws IOException;
+
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Fri Aug 17 16:52:07 2012
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -1667,6 +1668,43 @@ class FsDatasetImpl implements FsDataset
         datafile.getAbsolutePath(), metafile.getAbsolutePath());
     return info;
   }
+  
+  @Override // FsDatasetSpi
+  public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
+      throws IOException {
+    // List of VolumeIds, one per volume on the datanode
+    List<byte[]> blocksVolumeIds = new ArrayList<byte[]>(volumes.volumes.size());
+    // List of indexes into the list of VolumeIds, pointing at the VolumeId of
+    // the volume that the block is on
+    List<Integer> blocksVolumendexes = new ArrayList<Integer>(blocks.size());
+    // Initialize the list of VolumeIds simply by enumerating the volumes
+    for (int i = 0; i < volumes.volumes.size(); i++) {
+      blocksVolumeIds.add(new byte[] { (byte) i });
+    }
+    // Determine the index of the VolumeId of each block's volume, by comparing 
+    // the block's volume against the enumerated volumes
+    for (int i = 0; i < blocks.size(); i++) {
+      ExtendedBlock block = blocks.get(i);
+      FsVolumeSpi blockVolume = getReplicaInfo(block).getVolume();
+      boolean isValid = false;
+      int volumeIndex = 0;
+      for (FsVolumeImpl volume : volumes.volumes) {
+        // This comparison of references should be safe
+        if (blockVolume == volume) {
+          isValid = true;
+          break;
+        }
+        volumeIndex++;
+      }
+      // Indicates that the block is not present, or not found in a data dir
+      if (!isValid) {
+        volumeIndex = Integer.MAX_VALUE;
+      }
+      blocksVolumendexes.add(volumeIndex);
+    }
+    return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), 
+        blocksVolumeIds, blocksVolumendexes);
+  }
 
   @Override
   public RollingLogs createRollingLogs(String bpid, String prefix

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto Fri Aug 17 16:52:07 2012
@@ -90,6 +90,26 @@ message GetBlockLocalPathInfoResponsePro
 }
 
 /**
+ * blocks - list of ExtendedBlocks on which we are querying additional info
+ * tokens - list of access tokens corresponding to list of ExtendedBlocks
+ */
+message GetHdfsBlockLocationsRequestProto {
+  repeated ExtendedBlockProto blocks = 1;
+  repeated BlockTokenIdentifierProto tokens = 2;
+}
+
+/**
+ * volumeIds - id of each volume, potentially multiple bytes
+ * volumeIndexes - for each block, an index into volumeIds specifying the volume
+ *               on which it is located. If block is not present on any volume,
+ *               index is set to MAX_INT.
+ */
+message GetHdfsBlockLocationsResponseProto {
+  repeated bytes volumeIds = 1;
+  repeated uint32 volumeIndexes = 2;
+}
+
+/**
  * Protocol used from client to the Datanode.
  * See the request and response for details of rpc call.
  */
@@ -119,4 +139,11 @@ service ClientDatanodeProtocolService {
    */
   rpc getBlockLocalPathInfo(GetBlockLocalPathInfoRequestProto)
       returns(GetBlockLocalPathInfoResponseProto);
+
+  /**
+   * Retrieve additional HDFS-specific metadata about a set of blocks stored
+   * on the local file system.
+   */
+  rpc getHdfsBlockLocations(GetHdfsBlockLocationsRequestProto)
+      returns(GetHdfsBlockLocationsResponseProto);
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Aug 17 16:52:07 2012
@@ -78,7 +78,7 @@
 
 <property>
   <name>dfs.datanode.handler.count</name>
-  <value>3</value>
+  <value>10</value>
   <description>The number of server threads for the datanode.</description>
 </property>
 
@@ -1051,4 +1051,28 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
+  <value>false</value>
+  <description>
+    Boolean which enables backend datanode-side support for the experimental DistributedFileSystem#getFileVBlockStorageLocations API.
+  </description>
+</property>
+
+<property>
+  <name>dfs.client.file-block-storage-locations.num-threads</name>
+  <value>10</value>
+  <description>
+    Number of threads used for making parallel RPCs in DistributedFileSystem#getFileBlockStorageLocations().
+  </description>
+</property>
+
+<property>
+  <name>dfs.client.file-block-storage-locations.timeout</name>
+  <value>60</value>
+  <description>
+    Timeout (in seconds) for the parallel RPCs made in DistributedFileSystem#getFileBlockStorageLocations().
+  </description>
+</property>
+
 </configuration>

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Fri Aug 17 16:52:07 2012
@@ -27,10 +27,14 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
 import java.util.Random;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.BlockStorageLocation;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -38,6 +42,7 @@ import org.apache.hadoop.fs.FileChecksum
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -570,4 +575,93 @@ public class TestDistributedFileSystem {
     testDFSClient();
     testFileChecksum();
   }
+
+  /**
+   * Tests the normal path of batching up BlockLocation[]s to be passed to a
+   * single
+   * {@link DistributedFileSystem#getFileBlockStorageLocations(java.util.List)}
+   * call
+   */
+  @Test
+  public void testGetFileBlockStorageLocationsBatching() throws Exception {
+    final Configuration conf = getTestConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
+        true);
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(2).build();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    // Create two files
+    Path tmpFile1 = new Path("/tmpfile1.dat");
+    Path tmpFile2 = new Path("/tmpfile2.dat");
+    DFSTestUtil.createFile(fs, tmpFile1, 1024, (short) 2, 0xDEADDEADl);
+    DFSTestUtil.createFile(fs, tmpFile2, 1024, (short) 2, 0xDEADDEADl);
+    // Get locations of blocks of both files and concat together
+    BlockLocation[] blockLocs1 = fs.getFileBlockLocations(tmpFile1, 0, 1024);
+    BlockLocation[] blockLocs2 = fs.getFileBlockLocations(tmpFile2, 0, 1024);
+    BlockLocation[] blockLocs = (BlockLocation[]) ArrayUtils.addAll(blockLocs1,
+        blockLocs2);
+    // Fetch VolumeBlockLocations in batch
+    BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays
+        .asList(blockLocs));
+    int counter = 0;
+    // Print out the list of ids received for each block
+    for (BlockStorageLocation l : locs) {
+      for (int i = 0; i < l.getVolumeIds().length; i++) {
+        VolumeId id = l.getVolumeIds()[i];
+        String name = l.getNames()[i];
+        if (id != null) {
+          System.out.println("Datanode " + name + " has block " + counter
+              + " on volume id " + id.toString());
+        }
+      }
+      counter++;
+    }
+    assertEquals("Expected two HdfsBlockLocations for two 1-block files", 2,
+        locs.length);
+    for (BlockStorageLocation l : locs) {
+      assertEquals("Expected two replicas for each block", 2,
+          l.getVolumeIds().length);
+      for (int i = 0; i < l.getVolumeIds().length; i++) {
+        VolumeId id = l.getVolumeIds()[i];
+        String name = l.getNames()[i];
+        assertTrue("Expected block to be valid on datanode " + name,
+            id.isValid());
+      }
+    }
+  }
+
+  /**
+   * Tests error paths for
+   * {@link DistributedFileSystem#getFileBlockStorageLocations(java.util.List)}
+   */
+  @Test
+  public void testGetFileBlockStorageLocationsError() throws Exception {
+    final Configuration conf = getTestConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
+        true);
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(2).build();
+    cluster.getDataNodes();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    // Create a file
+    Path tmpFile = new Path("/tmpfile1.dat");
+    DFSTestUtil.createFile(fs, tmpFile, 1024, (short) 2, 0xDEADDEADl);
+    // Get locations of blocks of the file
+    BlockLocation[] blockLocs = fs.getFileBlockLocations(tmpFile, 0, 1024);
+    // Stop a datanode to simulate a failure
+    cluster.stopDataNode(0);
+    // Fetch VolumeBlockLocations
+    BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays
+        .asList(blockLocs));
+
+    assertEquals("Expected one HdfsBlockLocation for one 1-block file", 1,
+        locs.length);
+
+    for (BlockStorageLocation l : locs) {
+      assertEquals("Expected two replicas for each block", 2,
+          l.getVolumeIds().length);
+      assertTrue("Expected one valid and one invalid replica",
+          (l.getVolumeIds()[0].isValid()) ^ (l.getVolumeIds()[1].isValid()));
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Fri Aug 17 16:52:07 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -961,6 +962,12 @@ public class SimulatedFSDataset implemen
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
     throw new UnsupportedOperationException();
   }
+  
+  @Override
+  public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
+      throws IOException {
+    throw new UnsupportedOperationException();
+  }
 
   @Override
   public String[] getBlockPoolList() {



Mime
View raw message