hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject [4/6] Merge branch 'HDFS-6584' into trunk
Date Thu, 25 Sep 2014 20:51:43 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
new file mode 100644
index 0000000..f1837ae
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -0,0 +1,644 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.mover;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.cli.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.*;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
+import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
+import org.apache.hadoop.hdfs.server.balancer.Matcher;
+import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URI;
+import java.text.DateFormat;
+import java.util.*;
+
+@InterfaceAudience.Private
+public class Mover {
+  static final Log LOG = LogFactory.getLog(Mover.class);
+
+  static final Path MOVER_ID_PATH = new Path("/system/mover.id");
+
+  private static class StorageMap {
+    private final StorageGroupMap<Source> sources
+        = new StorageGroupMap<Source>();
+    private final StorageGroupMap<StorageGroup> targets
+        = new StorageGroupMap<StorageGroup>();
+    private final EnumMap<StorageType, List<StorageGroup>> targetStorageTypeMap
+        = new EnumMap<StorageType, List<StorageGroup>>(StorageType.class);
+    
+    private StorageMap() {
+      for(StorageType t : StorageType.asList()) {
+        targetStorageTypeMap.put(t, new LinkedList<StorageGroup>());
+      }
+    }
+    
+    private void add(Source source, StorageGroup target) {
+      sources.put(source);
+      targets.put(target);
+      getTargetStorages(target.getStorageType()).add(target);
+    }
+    
+    private Source getSource(MLocation ml) {
+      return get(sources, ml);
+    }
+
+    private StorageGroup getTarget(MLocation ml) {
+      return get(targets, ml);
+    }
+
+    private static <G extends StorageGroup> G get(StorageGroupMap<G> map, MLocation ml) {
+      return map.get(ml.datanode.getDatanodeUuid(), ml.storageType);
+    }
+    
+    private List<StorageGroup> getTargetStorages(StorageType t) {
+      return targetStorageTypeMap.get(t);
+    }
+  }
+
+  private final Dispatcher dispatcher;
+  private final StorageMap storages;
+  private final List<Path> targetPaths;
+
+  private final BlockStoragePolicy.Suite blockStoragePolicies;
+
+  Mover(NameNodeConnector nnc, Configuration conf) {
+    final long movedWinWidth = conf.getLong(
+        DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY,
+        DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_DEFAULT);
+    final int moverThreads = conf.getInt(
+        DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
+        DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
+    final int maxConcurrentMovesPerNode = conf.getInt(
+        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+
+    this.dispatcher = new Dispatcher(nnc, Collections.<String> emptySet(),
+        Collections.<String> emptySet(), movedWinWidth, moverThreads, 0,
+        maxConcurrentMovesPerNode, conf);
+    this.storages = new StorageMap();
+    this.blockStoragePolicies = BlockStoragePolicy.readBlockStorageSuite(conf);
+    this.targetPaths = nnc.getTargetPaths();
+  }
+
+  void init() throws IOException {
+    final List<DatanodeStorageReport> reports = dispatcher.init();
+    for(DatanodeStorageReport r : reports) {
+      final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
+      for(StorageType t : StorageType.asList()) {
+        final long maxRemaining = getMaxRemaining(r, t);
+        if (maxRemaining > 0L) {
+          final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher); 
+          final StorageGroup target = dn.addTarget(t, maxRemaining);
+          storages.add(source, target);
+        }
+      }
+    }
+  }
+
+  private ExitStatus run() {
+    try {
+      init();
+      boolean hasRemaining = new Processor().processNamespace();
+      return hasRemaining ? ExitStatus.IN_PROGRESS : ExitStatus.SUCCESS;
+    } catch (IllegalArgumentException e) {
+      System.out.println(e + ".  Exiting ...");
+      return ExitStatus.ILLEGAL_ARGUMENTS;
+    } catch (IOException e) {
+      System.out.println(e + ".  Exiting ...");
+      return ExitStatus.IO_EXCEPTION;
+    } finally {
+      dispatcher.shutdownNow();
+    }
+  }
+
+  DBlock newDBlock(Block block, List<MLocation> locations) {
+    final DBlock db = new DBlock(block);
+    for(MLocation ml : locations) {
+      db.addLocation(storages.getTarget(ml));
+    }
+    return db;
+  }
+
+  private static long getMaxRemaining(DatanodeStorageReport report, StorageType t) {
+    long max = 0L;
+    for(StorageReport r : report.getStorageReports()) {
+      if (r.getStorage().getStorageType() == t) {
+        if (r.getRemaining() > max) {
+          max = r.getRemaining();
+        }
+      }
+    }
+    return max;
+  }
+
+  /**
+   * convert a snapshot path to non-snapshot path. E.g.,
+   * /foo/.snapshot/snapshot-name/bar --> /foo/bar
+   */
+  private static String convertSnapshotPath(String[] pathComponents) {
+    StringBuilder sb = new StringBuilder(Path.SEPARATOR);
+    for (int i = 0; i < pathComponents.length; i++) {
+      if (pathComponents[i].equals(HdfsConstants.DOT_SNAPSHOT_DIR)) {
+        i++;
+      } else {
+        sb.append(pathComponents[i]);
+      }
+    }
+    return sb.toString();
+  }
+
+  class Processor {
+    private final DFSClient dfs;
+    private final List<String> snapshottableDirs = new ArrayList<String>();
+
+    Processor() {
+      dfs = dispatcher.getDistributedFileSystem().getClient();
+    }
+
+    private void getSnapshottableDirs() {
+      SnapshottableDirectoryStatus[] dirs = null;
+      try {
+        dirs = dfs.getSnapshottableDirListing();
+      } catch (IOException e) {
+        LOG.warn("Failed to get snapshottable directories."
+            + " Ignore and continue.", e);
+      }
+      if (dirs != null) {
+        for (SnapshottableDirectoryStatus dir : dirs) {
+          snapshottableDirs.add(dir.getFullPath().toString());
+        }
+      }
+    }
+
+    /**
+     * @return true if the given path is a snapshot path and the corresponding
+     * INode is still in the current fsdirectory.
+     */
+    private boolean isSnapshotPathInCurrent(String path) throws IOException {
+      // if the parent path contains "/.snapshot/", this is a snapshot path
+      if (path.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) {
+        String[] pathComponents = INode.getPathNames(path);
+        if (HdfsConstants.DOT_SNAPSHOT_DIR
+            .equals(pathComponents[pathComponents.length - 2])) {
+          // this is a path for a specific snapshot (e.g., /foo/.snapshot/s1)
+          return false;
+        }
+        String nonSnapshotPath = convertSnapshotPath(pathComponents);
+        return dfs.getFileInfo(nonSnapshotPath) != null;
+      } else {
+        return false;
+      }
+    }
+
+    /**
+     * @return whether there is still remaining migration work for the next
+     *         round
+     */
+    private boolean processNamespace() {
+      getSnapshottableDirs();
+      boolean hasRemaining = true;
+      try {
+        for (Path target : targetPaths) {
+          hasRemaining = processDirRecursively("", dfs.getFileInfo(target
+              .toUri().getPath()));
+        }
+      } catch (IOException e) {
+        LOG.warn("Failed to get root directory status. Ignore and continue.", e);
+      }
+      // wait for pending move to finish and retry the failed migration
+      hasRemaining |= Dispatcher.waitForMoveCompletion(storages.targets.values());
+      return hasRemaining;
+    }
+
+    /**
+     * @return whether there is still remaing migration work for the next
+     *         round
+     */
+    private boolean processChildrenList(String fullPath) {
+      boolean hasRemaining = false;
+      for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
+        final DirectoryListing children;
+        try {
+          children = dfs.listPaths(fullPath, lastReturnedName, true);
+        } catch(IOException e) {
+          LOG.warn("Failed to list directory " + fullPath
+              + ". Ignore the directory and continue.", e);
+          return hasRemaining;
+        }
+        if (children == null) {
+          return hasRemaining;
+        }
+        for (HdfsFileStatus child : children.getPartialListing()) {
+          hasRemaining |= processDirRecursively(fullPath, child);
+        }
+        if (children.hasMore()) {
+          lastReturnedName = children.getLastName();
+        } else {
+          return hasRemaining;
+        }
+      }
+    }
+
+    /** @return whether the migration requires next round */
+    private boolean processDirRecursively(String parent,
+                                          HdfsFileStatus status) {
+      String fullPath = status.getFullName(parent);
+      boolean hasRemaining = false;
+      if (status.isDir()) {
+        if (!fullPath.endsWith(Path.SEPARATOR)) {
+          fullPath = fullPath + Path.SEPARATOR;
+        }
+
+        hasRemaining = processChildrenList(fullPath);
+        // process snapshots if this is a snapshottable directory
+        if (snapshottableDirs.contains(fullPath)) {
+          final String dirSnapshot = fullPath + HdfsConstants.DOT_SNAPSHOT_DIR;
+          hasRemaining |= processChildrenList(dirSnapshot);
+        }
+      } else if (!status.isSymlink()) { // file
+        try {
+          if (!isSnapshotPathInCurrent(fullPath)) {
+            // the full path is a snapshot path but it is also included in the
+            // current directory tree, thus ignore it.
+            hasRemaining = processFile((HdfsLocatedFileStatus)status);
+          }
+        } catch (IOException e) {
+          LOG.warn("Failed to check the status of " + parent
+              + ". Ignore it and continue.", e);
+          return false;
+        }
+      }
+      return hasRemaining;
+    }
+
+    /** @return true if it is necessary to run another round of migration */
+    private boolean processFile(HdfsLocatedFileStatus status) {
+      final BlockStoragePolicy policy = blockStoragePolicies.getPolicy(
+          status.getStoragePolicy());
+      final List<StorageType> types = policy.chooseStorageTypes(
+          status.getReplication());
+
+      final LocatedBlocks locatedBlocks = status.getBlockLocations();
+      boolean hasRemaining = false;
+      final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
+      List<LocatedBlock> lbs = locatedBlocks.getLocatedBlocks();
+      for(int i = 0; i < lbs.size(); i++) {
+        if (i == lbs.size() - 1 && !lastBlkComplete) {
+          // last block is incomplete, skip it
+          continue;
+        }
+        LocatedBlock lb = lbs.get(i);
+        final StorageTypeDiff diff = new StorageTypeDiff(types,
+            lb.getStorageTypes());
+        if (!diff.removeOverlap()) {
+          if (scheduleMoves4Block(diff, lb)) {
+            hasRemaining |= (diff.existing.size() > 1 &&
+                diff.expected.size() > 1);
+          }
+        }
+      }
+      return hasRemaining;
+    }
+
+    boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
+      final List<MLocation> locations = MLocation.toLocations(lb);
+      Collections.shuffle(locations);
+      final DBlock db = newDBlock(lb.getBlock().getLocalBlock(), locations);
+
+      for (final StorageType t : diff.existing) {
+        for (final MLocation ml : locations) {
+          final Source source = storages.getSource(ml);
+          if (ml.storageType == t) {
+            // try to schedule one replica move.
+            if (scheduleMoveReplica(db, source, diff.expected)) {
+              return true;
+            }
+          }
+        }
+      }
+      return false;
+    }
+
+    @VisibleForTesting
+    boolean scheduleMoveReplica(DBlock db, MLocation ml,
+                                List<StorageType> targetTypes) {
+      return scheduleMoveReplica(db, storages.getSource(ml), targetTypes);
+    }
+
+    boolean scheduleMoveReplica(DBlock db, Source source,
+        List<StorageType> targetTypes) {
+      if (dispatcher.getCluster().isNodeGroupAware()) {
+        if (chooseTarget(db, source, targetTypes, Matcher.SAME_NODE_GROUP)) {
+          return true;
+        }
+      }
+      
+      // Then, match nodes on the same rack
+      if (chooseTarget(db, source, targetTypes, Matcher.SAME_RACK)) {
+        return true;
+      }
+      // At last, match all remaining nodes
+      return chooseTarget(db, source, targetTypes, Matcher.ANY_OTHER);
+    }
+
+    boolean chooseTarget(DBlock db, Source source,
+        List<StorageType> targetTypes, Matcher matcher) {
+      final NetworkTopology cluster = dispatcher.getCluster(); 
+      for (StorageType t : targetTypes) {
+        for(StorageGroup target : storages.getTargetStorages(t)) {
+          if (matcher.match(cluster, source.getDatanodeInfo(),
+              target.getDatanodeInfo())) {
+            final PendingMove pm = source.addPendingMove(db, target);
+            if (pm != null) {
+              dispatcher.executePendingMove(pm);
+              return true;
+            }
+          }
+        }
+      }
+      return false;
+    }
+  }
+
+  static class MLocation {
+    final DatanodeInfo datanode;
+    final StorageType storageType;
+    final long size;
+    
+    MLocation(DatanodeInfo datanode, StorageType storageType, long size) {
+      this.datanode = datanode;
+      this.storageType = storageType;
+      this.size = size;
+    }
+    
+    static List<MLocation> toLocations(LocatedBlock lb) {
+      final DatanodeInfo[] datanodeInfos = lb.getLocations();
+      final StorageType[] storageTypes = lb.getStorageTypes();
+      final long size = lb.getBlockSize();
+      final List<MLocation> locations = new LinkedList<MLocation>();
+      for(int i = 0; i < datanodeInfos.length; i++) {
+        locations.add(new MLocation(datanodeInfos[i], storageTypes[i], size));
+      }
+      return locations;
+    }
+  }
+
+  @VisibleForTesting
+  static class StorageTypeDiff {
+    final List<StorageType> expected;
+    final List<StorageType> existing;
+
+    StorageTypeDiff(List<StorageType> expected, StorageType[] existing) {
+      this.expected = new LinkedList<StorageType>(expected);
+      this.existing = new LinkedList<StorageType>(Arrays.asList(existing));
+    }
+    
+    /**
+     * Remove the overlap between the expected types and the existing types.
+     * @return if the existing types or the expected types is empty after
+     *         removing the overlap.
+     */
+    boolean removeOverlap() { 
+      for(Iterator<StorageType> i = existing.iterator(); i.hasNext(); ) {
+        final StorageType t = i.next();
+        if (expected.remove(t)) {
+          i.remove();
+        }
+      }
+      return expected.isEmpty() || existing.isEmpty();
+    }
+    
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + "{expected=" + expected
+          + ", existing=" + existing + "}";
+    }
+  }
+
+  static int run(Map<URI, List<Path>> namenodes, Configuration conf)
+      throws IOException, InterruptedException {
+    final long sleeptime =
+        conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+            DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 +
+        conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
+            DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000;
+    LOG.info("namenodes = " + namenodes);
+    
+    List<NameNodeConnector> connectors = Collections.emptyList();
+    try {
+      connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
+            Mover.class.getSimpleName(), MOVER_ID_PATH, conf);
+
+      while (connectors.size() > 0) {
+        Collections.shuffle(connectors);
+        Iterator<NameNodeConnector> iter = connectors.iterator();
+        while (iter.hasNext()) {
+          NameNodeConnector nnc = iter.next();
+          final Mover m = new Mover(nnc, conf);
+          final ExitStatus r = m.run();
+
+          if (r == ExitStatus.SUCCESS) {
+            IOUtils.cleanup(LOG, nnc);
+            iter.remove();
+          } else if (r != ExitStatus.IN_PROGRESS) {
+            // must be an error statue, return
+            return r.getExitCode();
+          }
+        }
+        Thread.sleep(sleeptime);
+      }
+      return ExitStatus.SUCCESS.getExitCode();
+    } finally {
+      for (NameNodeConnector nnc : connectors) {
+        IOUtils.cleanup(LOG, nnc);
+      }
+    }
+  }
+
+  static class Cli extends Configured implements Tool {
+    private static final String USAGE = "Usage: java "
+        + Mover.class.getSimpleName() + " [-p <files/dirs> | -f <local file>]"
+        + "\n\t-p <files/dirs>\ta space separated list of HDFS files/dirs to migrate."
+        + "\n\t-f <local file>\ta local file containing a list of HDFS files/dirs to migrate.";
+
+    private static Options buildCliOptions() {
+      Options opts = new Options();
+      Option file = OptionBuilder.withArgName("pathsFile").hasArg()
+          .withDescription("a local file containing files/dirs to migrate")
+          .create("f");
+      Option paths = OptionBuilder.withArgName("paths").hasArgs()
+          .withDescription("specify space separated files/dirs to migrate")
+          .create("p");
+      OptionGroup group = new OptionGroup();
+      group.addOption(file);
+      group.addOption(paths);
+      opts.addOptionGroup(group);
+      return opts;
+    }
+
+    private static String[] readPathFile(String file) throws IOException {
+      List<String> list = Lists.newArrayList();
+      BufferedReader reader = new BufferedReader(new FileReader(file));
+      try {
+        String line;
+        while ((line = reader.readLine()) != null) {
+          if (!line.trim().isEmpty()) {
+            list.add(line);
+          }
+        }
+      } finally {
+        IOUtils.cleanup(LOG, reader);
+      }
+      return list.toArray(new String[list.size()]);
+    }
+
+    private static Map<URI, List<Path>> getNameNodePaths(CommandLine line,
+        Configuration conf) throws Exception {
+      Map<URI, List<Path>> map = Maps.newHashMap();
+      String[] paths = null;
+      if (line.hasOption("f")) {
+        paths = readPathFile(line.getOptionValue("f"));
+      } else if (line.hasOption("p")) {
+        paths = line.getOptionValues("p");
+      }
+      Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+      if (paths == null || paths.length == 0) {
+        for (URI namenode : namenodes) {
+          map.put(namenode, null);
+        }
+        return map;
+      }
+      final URI singleNs = namenodes.size() == 1 ?
+          namenodes.iterator().next() : null;
+      for (String path : paths) {
+        Path target = new Path(path);
+        if (!target.isUriPathAbsolute()) {
+          throw new IllegalArgumentException("The path " + target
+              + " is not absolute");
+        }
+        URI targetUri = target.toUri();
+        if ((targetUri.getAuthority() == null || targetUri.getScheme() ==
+            null) && singleNs == null) {
+          // each path must contains both scheme and authority information
+          // unless there is only one name service specified in the
+          // configuration
+          throw new IllegalArgumentException("The path " + target
+              + " does not contain scheme and authority thus cannot identify"
+              + " its name service");
+        }
+        URI key = singleNs;
+        if (singleNs == null) {
+          key = new URI(targetUri.getScheme(), targetUri.getAuthority(),
+              null, null, null);
+          if (!namenodes.contains(key)) {
+            throw new IllegalArgumentException("Cannot resolve the path " +
+                target + ". The namenode services specified in the " +
+                "configuration: " + namenodes);
+          }
+        }
+        List<Path> targets = map.get(key);
+        if (targets == null) {
+          targets = Lists.newArrayList();
+          map.put(key, targets);
+        }
+        targets.add(Path.getPathWithoutSchemeAndAuthority(target));
+      }
+      return map;
+    }
+
+    @VisibleForTesting
+    static Map<URI, List<Path>> getNameNodePathsToMove(Configuration conf,
+        String... args) throws Exception {
+      final Options opts = buildCliOptions();
+      CommandLineParser parser = new GnuParser();
+      CommandLine commandLine = parser.parse(opts, args, true);
+      return getNameNodePaths(commandLine, conf);
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+      final long startTime = Time.monotonicNow();
+      final Configuration conf = getConf();
+
+      try {
+        final Map<URI, List<Path>> map = getNameNodePathsToMove(conf, args);
+        return Mover.run(map, conf);
+      } catch (IOException e) {
+        System.out.println(e + ".  Exiting ...");
+        return ExitStatus.IO_EXCEPTION.getExitCode();
+      } catch (InterruptedException e) {
+        System.out.println(e + ".  Exiting ...");
+        return ExitStatus.INTERRUPTED.getExitCode();
+      } catch (ParseException e) {
+        System.out.println(e + ".  Exiting ...");
+        return ExitStatus.ILLEGAL_ARGUMENTS.getExitCode();
+      } catch (IllegalArgumentException e) {
+        System.out.println(e + ".  Exiting ...");
+        return ExitStatus.ILLEGAL_ARGUMENTS.getExitCode();
+      } finally {
+        System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date()));
+        System.out.println("Mover took " + StringUtils.formatTime(Time.monotonicNow()-startTime));
+      }
+    }
+  }
+
+  /**
+   * Run a Mover in command line.
+   *
+   * @param args Command line arguments
+   */
+  public static void main(String[] args) {
+    if (DFSUtil.parseHelpArgument(args, Cli.USAGE, System.out, true)) {
+      System.exit(0);
+    }
+
+    try {
+      System.exit(ToolRunner.run(new HdfsConfiguration(), new Cli(), args));
+    } catch (Throwable e) {
+      LOG.error("Exiting " + Mover.class.getSimpleName()
+          + " due to an exception", e);
+      System.exit(-1);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 7c96302..cc5e995 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -274,6 +275,12 @@ public class FSDirectory implements Closeable {
     skipQuotaCheck = true;
   }
 
+  private static INodeFile newINodeFile(long id, PermissionStatus permissions,
+      long mtime, long atime, short replication, long preferredBlockSize) {
+    return new INodeFile(id, null, permissions, mtime, atime,
+        BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize, (byte)0);
+  }
+
   /**
    * Add the given filename to the fs.
    * @throws FileAlreadyExistsException
@@ -288,9 +295,8 @@ public class FSDirectory implements Closeable {
       UnresolvedLinkException, SnapshotAccessControlException, AclException {
 
     long modTime = now();
-    INodeFile newNode = new INodeFile(namesystem.allocateNewInodeId(), null,
-        permissions, modTime, modTime, BlockInfo.EMPTY_ARRAY, replication,
-        preferredBlockSize);
+    INodeFile newNode = newINodeFile(namesystem.allocateNewInodeId(),
+        permissions, modTime, modTime, replication, preferredBlockSize);
     newNode.toUnderConstruction(clientName, clientMachine);
 
     boolean added = false;
@@ -326,14 +332,13 @@ public class FSDirectory implements Closeable {
     final INodeFile newNode;
     assert hasWriteLock();
     if (underConstruction) {
-      newNode = new INodeFile(id, null, permissions, modificationTime,
-          modificationTime, BlockInfo.EMPTY_ARRAY, replication,
-          preferredBlockSize);
+      newNode = newINodeFile(id, permissions, modificationTime,
+          modificationTime, replication, preferredBlockSize);
       newNode.toUnderConstruction(clientName, clientMachine);
 
     } else {
-      newNode = new INodeFile(id, null, permissions, modificationTime, atime,
-          BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize);
+      newNode = newINodeFile(id, permissions, modificationTime, atime,
+          replication, preferredBlockSize);
     }
 
     try {
@@ -1000,6 +1005,44 @@ public class FSDirectory implements Closeable {
     return file.getBlocks();
   }
 
+  /** Set block storage policy for a directory */
+  void setStoragePolicy(String src, byte policyId)
+      throws IOException {
+    writeLock();
+    try {
+      unprotectedSetStoragePolicy(src, policyId);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  void unprotectedSetStoragePolicy(String src, byte policyId)
+      throws IOException {
+    assert hasWriteLock();
+    final INodesInPath iip = getINodesInPath4Write(src, true);
+    final INode inode = iip.getLastINode();
+    if (inode == null) {
+      throw new FileNotFoundException("File/Directory does not exist: " + src);
+    }
+    final int snapshotId = iip.getLatestSnapshotId();
+    if (inode.isFile()) {
+      inode.asFile().setStoragePolicyID(policyId, snapshotId);
+    } else if (inode.isDirectory()) {
+      setDirStoragePolicy(inode.asDirectory(), policyId, snapshotId);  
+    } else {
+      throw new FileNotFoundException(src + " is not a file or directory");
+    }
+  }
+
+  private void setDirStoragePolicy(INodeDirectory inode, byte policyId,
+      int latestSnapshotId) throws IOException {
+    List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
+    XAttr xAttr = BlockStoragePolicy.buildXAttr(policyId);
+    List<XAttr> newXAttrs = setINodeXAttrs(existingXAttrs, Arrays.asList(xAttr),
+        EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE));
+    XAttrStorage.updateINodeXAttrs(inode, newXAttrs, latestSnapshotId);
+  }
+
   /**
    * @param path the file path
    * @return the block size of the file. 
@@ -1331,6 +1374,11 @@ public class FSDirectory implements Closeable {
     }
   }
 
+  private byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) {
+    return inodePolicy != BlockStoragePolicy.ID_UNSPECIFIED ? inodePolicy :
+        parentPolicy;
+  }
+
   /**
    * Get a partial listing of the indicated directory
    *
@@ -1345,7 +1393,8 @@ public class FSDirectory implements Closeable {
    * @return a partial listing starting after startAfter
    */
   DirectoryListing getListing(String src, byte[] startAfter,
-      boolean needLocation) throws UnresolvedLinkException, IOException {
+      boolean needLocation, boolean isSuperUser)
+      throws UnresolvedLinkException, IOException {
     String srcs = normalizePath(src);
     final boolean isRawPath = isReservedRawName(src);
 
@@ -1354,18 +1403,19 @@ public class FSDirectory implements Closeable {
       if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
         return getSnapshotsListing(srcs, startAfter);
       }
-      final INodesInPath inodesInPath = getINodesInPath(srcs, true);
-      final INode[] inodes = inodesInPath.getINodes();
+      final INodesInPath inodesInPath = getLastINodeInPath(srcs);
       final int snapshot = inodesInPath.getPathSnapshotId();
-      final INode targetNode = inodes[inodes.length - 1];
+      final INode targetNode = inodesInPath.getLastINode();
       if (targetNode == null)
         return null;
+      byte parentStoragePolicy = isSuperUser ?
+          targetNode.getStoragePolicyID() : BlockStoragePolicy.ID_UNSPECIFIED;
       
       if (!targetNode.isDirectory()) {
         return new DirectoryListing(
             new HdfsFileStatus[]{createFileStatus(HdfsFileStatus.EMPTY_NAME,
-                targetNode, needLocation, snapshot, isRawPath,
-                inodesInPath)}, 0);
+                targetNode, needLocation, parentStoragePolicy, snapshot,
+                isRawPath, inodesInPath)}, 0);
       }
 
       final INodeDirectory dirInode = targetNode.asDirectory();
@@ -1378,8 +1428,11 @@ public class FSDirectory implements Closeable {
       HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
       for (int i=0; i<numOfListing && locationBudget>0; i++) {
         INode cur = contents.get(startChild+i);
-        listing[i] = createFileStatus(cur.getLocalNameBytes(), cur,
-            needLocation, snapshot, isRawPath, inodesInPath);
+        byte curPolicy = isSuperUser && !cur.isSymlink()?
+            cur.getLocalStoragePolicyID(): BlockStoragePolicy.ID_UNSPECIFIED;
+        listing[i] = createFileStatus(cur.getLocalNameBytes(), cur, needLocation,
+            getStoragePolicyID(curPolicy, parentStoragePolicy), snapshot,
+            isRawPath, inodesInPath);
         listingCnt++;
         if (needLocation) {
             // Once we  hit lsLimit locations, stop.
@@ -1430,7 +1483,8 @@ public class FSDirectory implements Closeable {
     for (int i = 0; i < numOfListing; i++) {
       Root sRoot = snapshots.get(i + skipSize).getRoot();
       listing[i] = createFileStatus(sRoot.getLocalNameBytes(), sRoot,
-          Snapshot.CURRENT_STATE_ID, false, null);
+          BlockStoragePolicy.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
+          false, null);
     }
     return new DirectoryListing(
         listing, snapshots.size() - skipSize - numOfListing);
@@ -1440,10 +1494,12 @@ public class FSDirectory implements Closeable {
    * @param src The string representation of the path to the file
    * @param resolveLink whether to throw UnresolvedLinkException
    * @param isRawPath true if a /.reserved/raw pathname was passed by the user
+   * @param includeStoragePolicy whether to include storage policy
    * @return object containing information regarding the file
    *         or null if file not found
    */
-  HdfsFileStatus getFileInfo(String src, boolean resolveLink, boolean isRawPath)
+  HdfsFileStatus getFileInfo(String src, boolean resolveLink,
+      boolean isRawPath, boolean includeStoragePolicy)
     throws IOException {
     String srcs = normalizePath(src);
     readLock();
@@ -1454,9 +1510,10 @@ public class FSDirectory implements Closeable {
       final INodesInPath inodesInPath = getINodesInPath(srcs, resolveLink);
       final INode[] inodes = inodesInPath.getINodes();
       final INode i = inodes[inodes.length - 1];
-
-      return i == null? null: createFileStatus(HdfsFileStatus.EMPTY_NAME, i,
-          inodesInPath.getPathSnapshotId(), isRawPath, inodesInPath);
+      byte policyId = includeStoragePolicy && i != null && !i.isSymlink() ?
+          i.getStoragePolicyID() : BlockStoragePolicy.ID_UNSPECIFIED;
+      return i == null ? null : createFileStatus(HdfsFileStatus.EMPTY_NAME, i,
+          policyId, inodesInPath.getPathSnapshotId(), isRawPath, inodesInPath);
     } finally {
       readUnlock();
     }
@@ -1473,7 +1530,7 @@ public class FSDirectory implements Closeable {
       throws UnresolvedLinkException {
     if (getINode4DotSnapshot(src) != null) {
       return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
-          HdfsFileStatus.EMPTY_NAME, -1L, 0, null);
+          HdfsFileStatus.EMPTY_NAME, -1L, 0, null, BlockStoragePolicy.ID_UNSPECIFIED);
     }
     return null;
   }
@@ -2312,19 +2369,22 @@ public class FSDirectory implements Closeable {
    * @throws IOException if any error occurs
    */
   private HdfsFileStatus createFileStatus(byte[] path, INode node,
-      boolean needLocation, int snapshot, boolean isRawPath,
-      INodesInPath iip)
+      boolean needLocation, byte storagePolicy, int snapshot,
+      boolean isRawPath, INodesInPath iip)
       throws IOException {
     if (needLocation) {
-      return createLocatedFileStatus(path, node, snapshot, isRawPath, iip);
+      return createLocatedFileStatus(path, node, storagePolicy, snapshot,
+          isRawPath, iip);
     } else {
-      return createFileStatus(path, node, snapshot, isRawPath, iip);
+      return createFileStatus(path, node, storagePolicy, snapshot,
+          isRawPath, iip);
     }
   }
+
   /**
    * Create FileStatus by file INode 
    */
-   HdfsFileStatus createFileStatus(byte[] path, INode node,
+  HdfsFileStatus createFileStatus(byte[] path, INode node, byte storagePolicy,
       int snapshot, boolean isRawPath, INodesInPath iip) throws IOException {
      long size = 0;     // length is zero for directories
      short replication = 0;
@@ -2362,14 +2422,15 @@ public class FSDirectory implements Closeable {
         path,
         node.getId(),
         childrenNum,
-        feInfo);
+        feInfo,
+        storagePolicy);
   }
 
   /**
    * Create FileStatus with location info by file INode
    */
-  private HdfsLocatedFileStatus createLocatedFileStatus(byte[] path,
-      INode node, int snapshot, boolean isRawPath,
+  private HdfsLocatedFileStatus createLocatedFileStatus(byte[] path, INode node,
+      byte storagePolicy, int snapshot, boolean isRawPath,
       INodesInPath iip) throws IOException {
     assert hasReadLock();
     long size = 0; // length is zero for directories
@@ -2411,7 +2472,7 @@ public class FSDirectory implements Closeable {
           getPermissionForFileStatus(node, snapshot, isEncrypted),
           node.getUserName(snapshot), node.getGroupName(snapshot),
           node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
-          node.getId(), loc, childrenNum, feInfo);
+          node.getId(), loc, childrenNum, feInfo, storagePolicy);
     // Set caching information for the located blocks.
     if (loc != null) {
       CacheManager cacheManager = namesystem.getCacheManager();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index 6bc0319..70a473f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
@@ -829,7 +830,16 @@ public class FSEditLog implements LogsPurgeable {
       .setReplication(replication);
     logEdit(op);
   }
-  
+
+  /** 
+   * Add set storage policy id record to edit log
+   */
+  void logSetStoragePolicy(String src, byte policyId) {
+    SetStoragePolicyOp op = SetStoragePolicyOp.getInstance(cache.get())
+        .setPath(src).setPolicyId(policyId);
+    logEdit(op);
+  }
+
   /** Add set namespace quota record to edit log
    * 
    * @param src the string representation of the path to a directory

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 029de6c..83761bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -79,6 +80,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveXAttrOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
@@ -371,7 +373,8 @@ public class FSEditLogLoader {
         // add the op into retry cache if necessary
         if (toAddRetryCache) {
           HdfsFileStatus stat = fsNamesys.dir.createFileStatus(
-              HdfsFileStatus.EMPTY_NAME, newFile, Snapshot.CURRENT_STATE_ID,
+              HdfsFileStatus.EMPTY_NAME, newFile,
+              BlockStoragePolicy.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
               false, iip);
           fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
               addCloseOp.rpcCallId, stat);
@@ -838,6 +841,13 @@ public class FSEditLogLoader {
       }
       break;
     }
+    case OP_SET_STORAGE_POLICY: {
+      SetStoragePolicyOp setStoragePolicyOp = (SetStoragePolicyOp) op;
+      fsDir.unprotectedSetStoragePolicy(
+          renameReservedPathsOnUpgrade(setStoragePolicyOp.path, logVersion),
+          setStoragePolicyOp.policyId);
+      break;
+    }
     default:
       throw new IOException("Invalid operation read " + op.opCode);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
index 6caee17..07bba39 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
@@ -61,6 +61,7 @@ import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SYMLINK
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TIMES;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_BLOCKS;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_MASTER_KEY;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_STORAGE_POLICY;
 
 import java.io.DataInput;
 import java.io.DataInputStream;
@@ -195,6 +196,7 @@ public abstract class FSEditLogOp {
           OP_ROLLING_UPGRADE_FINALIZE, "finalize"));
       inst.put(OP_SET_XATTR, new SetXAttrOp());
       inst.put(OP_REMOVE_XATTR, new RemoveXAttrOp());
+      inst.put(OP_SET_STORAGE_POLICY, new SetStoragePolicyOp());
     }
     
     public FSEditLogOp get(FSEditLogOpCodes opcode) {
@@ -3800,6 +3802,71 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#setStoragePolicy} */
+  static class SetStoragePolicyOp extends FSEditLogOp {
+    String path;
+    byte policyId;
+
+    private SetStoragePolicyOp() {
+      super(OP_SET_STORAGE_POLICY);
+    }
+
+    static SetStoragePolicyOp getInstance(OpInstanceCache cache) {
+      return (SetStoragePolicyOp) cache.get(OP_SET_STORAGE_POLICY);
+    }
+
+    SetStoragePolicyOp setPath(String path) {
+      this.path = path;
+      return this;
+    }
+
+    SetStoragePolicyOp setPolicyId(byte policyId) {
+      this.policyId = policyId;
+      return this;
+    }
+
+    @Override
+    public void writeFields(DataOutputStream out) throws IOException {
+      FSImageSerialization.writeString(path, out);
+      out.writeByte(policyId);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      this.path = FSImageSerialization.readString(in);
+      this.policyId = in.readByte();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("SetStoragePolicyOp [path=");
+      builder.append(path);
+      builder.append(", policyId=");
+      builder.append(policyId);
+      builder.append(", opCode=");
+      builder.append(opCode);
+      builder.append(", txid=");
+      builder.append(txid);
+      builder.append("]");
+      return builder.toString();
+    }
+
+    @Override
+    protected void toXml(ContentHandler contentHandler) throws SAXException {
+      XMLUtils.addSaxString(contentHandler, "PATH", path);
+      XMLUtils.addSaxString(contentHandler, "POLICYID",
+          Byte.valueOf(policyId).toString());
+    }
+
+    @Override
+    void fromXml(Stanza st) throws InvalidXmlException {
+      this.path = st.getValue("PATH");
+      this.policyId = Byte.valueOf(st.getValue("POLICYID"));
+    }
+  }  
+
   /**
    * Class for writing editlog ops
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
index bf4bbb4..86be54a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
@@ -72,6 +72,7 @@ public enum FSEditLogOpCodes {
   OP_ROLLING_UPGRADE_FINALIZE   ((byte) 42),
   OP_SET_XATTR                  ((byte) 43),
   OP_REMOVE_XATTR               ((byte) 44),
+  OP_SET_STORAGE_POLICY         ((byte) 45),
 
   // Note that the current range of the valid OP code is 0~127
   OP_INVALID                    ((byte) -1);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
index 2677332..4aeb6cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
@@ -786,7 +786,7 @@ public class FSImageFormat {
         counter.increment();
       }
       final INodeFile file = new INodeFile(inodeId, localName, permissions,
-          modificationTime, atime, blocks, replication, blockSize);
+          modificationTime, atime, blocks, replication, blockSize, (byte)0);
       if (underConstruction) {
         file.toUnderConstruction(clientName, clientMachine);
       }
@@ -890,7 +890,7 @@ public class FSImageFormat {
       final long preferredBlockSize = in.readLong();
 
       return new INodeFileAttributes.SnapshotCopy(name, permissions, null, modificationTime,
-          accessTime, replication, preferredBlockSize, null);
+          accessTime, replication, preferredBlockSize, (byte)0, null);
     }
 
     public INodeDirectoryAttributes loadINodeDirectoryAttributes(DataInput in)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index 2a46fc4..5631b2b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@ -289,7 +289,8 @@ public final class FSImageFormatPBINode {
 
       final INodeFile file = new INodeFile(n.getId(),
           n.getName().toByteArray(), permissions, f.getModificationTime(),
-          f.getAccessTime(), blocks, replication, f.getPreferredBlockSize());
+          f.getAccessTime(), blocks, replication, f.getPreferredBlockSize(),
+          (byte)f.getStoragePolicyID());
 
       if (f.hasAcl()) {
         file.addAclFeature(new AclFeature(loadAclEntries(f.getAcl(),
@@ -398,7 +399,8 @@ public final class FSImageFormatPBINode {
           .setModificationTime(file.getModificationTime())
           .setPermission(buildPermissionStatus(file, state.getStringMap()))
           .setPreferredBlockSize(file.getPreferredBlockSize())
-          .setReplication(file.getFileReplication());
+          .setReplication(file.getFileReplication())
+          .setStoragePolicyID(file.getLocalStoragePolicyID());
 
       AclFeature f = file.getAclFeature();
       if (f != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
index 3956d9a..be70f4b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
@@ -149,7 +149,7 @@ public class FSImageSerialization {
     assert numLocs == 0 : "Unexpected block locations";
 
     INodeFile file = new INodeFile(inodeId, name, perm, modificationTime,
-        modificationTime, blocks, blockReplication, preferredBlockSize);
+        modificationTime, blocks, blockReplication, preferredBlockSize, (byte)0);
     file.toUnderConstruction(clientName, clientMachine);
     return file;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 3788a11..b7ed16b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -166,6 +166,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.ServiceFailedException;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
@@ -325,7 +326,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private HdfsFileStatus getAuditFileInfo(String path, boolean resolveSymlink)
       throws IOException {
     return (isAuditEnabled() && isExternalInvocation())
-        ? dir.getFileInfo(path, resolveSymlink, false) : null;
+        ? dir.getFileInfo(path, resolveSymlink, false, false) : null;
   }
   
   private void logAuditEvent(boolean succeeded, String cmd, String src)
@@ -2258,6 +2259,52 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     return isFile;
   }
 
+  /**
+   * Set the storage policy for a file or a directory.
+   *
+   * @param src file/directory path
+   * @param policyName storage policy name
+   */
+  void setStoragePolicy(String src, final String policyName)
+      throws IOException {
+    try {
+      setStoragePolicyInt(src, policyName);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "setStoragePolicy", src);
+      throw e;
+    }
+  }
+
+  private void setStoragePolicyInt(String src, final String policyName)
+      throws IOException {
+    checkSuperuserPrivilege();
+    checkOperation(OperationCategory.WRITE);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    waitForLoadingFSImage();
+    HdfsFileStatus fileStat;
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("Cannot set storage policy for " + src);
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+
+      // get the corresponding policy and make sure the policy name is valid
+      BlockStoragePolicy policy = blockManager.getStoragePolicy(policyName);
+      if (policy == null) {
+        throw new HadoopIllegalArgumentException(
+            "Cannot find a block policy with the name " + policyName);
+      }
+      dir.setStoragePolicy(src, policy.getId());
+      getEditLog().logSetStoragePolicy(src, policy.getId());
+      fileStat = getAuditFileInfo(src, false);
+    } finally {
+      writeUnlock();
+    }
+
+    getEditLog().logSync();
+    logAuditEvent(true, "setStoragePolicy", src, null, fileStat);
+  }
+
   long getPreferredBlockSize(String filename) 
       throws IOException, UnresolvedLinkException {
     FSPermissionChecker pc = getPermissionChecker();
@@ -2495,7 +2542,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           clientMachine, create, overwrite, createParent, replication, 
           blockSize, suite, edek, logRetryCache);
       stat = dir.getFileInfo(src, false,
-          FSDirectory.isReservedRawName(srcArg));
+          FSDirectory.isReservedRawName(srcArg), false);
     } catch (StandbyException se) {
       skipSync = true;
       throw se;
@@ -2967,8 +3014,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throws LeaseExpiredException, NotReplicatedYetException,
       QuotaExceededException, SafeModeException, UnresolvedLinkException,
       IOException {
-    long blockSize;
-    int replication;
+    final long blockSize;
+    final int replication;
+    final byte storagePolicyID;
     DatanodeDescriptor clientNode = null;
 
     if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -3003,13 +3051,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       clientNode = blockManager.getDatanodeManager().getDatanodeByHost(
               pendingFile.getFileUnderConstructionFeature().getClientMachine());
       replication = pendingFile.getFileReplication();
+      storagePolicyID = pendingFile.getStoragePolicyID();
     } finally {
       readUnlock();
     }
 
     // choose targets for the new block to be allocated.
-    final DatanodeStorageInfo targets[] = getBlockManager().chooseTarget( 
-        src, replication, clientNode, excludedNodes, blockSize, favoredNodes);
+    final DatanodeStorageInfo targets[] = getBlockManager().chooseTarget4NewBlock( 
+        src, replication, clientNode, excludedNodes, blockSize, favoredNodes,
+        storagePolicyID);
 
     // Part II.
     // Allocate a new block, add it to the INode and the BlocksMap. 
@@ -3197,6 +3247,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
     final DatanodeDescriptor clientnode;
     final long preferredblocksize;
+    final byte storagePolicyID;
     final List<DatanodeStorageInfo> chosen;
     checkOperation(OperationCategory.READ);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
@@ -3223,6 +3274,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
               .getClientMachine();
       clientnode = blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
       preferredblocksize = file.getPreferredBlockSize();
+      storagePolicyID = file.getStoragePolicyID();
 
       //find datanode storages
       final DatanodeManager dm = blockManager.getDatanodeManager();
@@ -3232,10 +3284,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
 
     // choose new datanodes.
-    final DatanodeStorageInfo[] targets = blockManager.getBlockPlacementPolicy(
-        ).chooseTarget(src, numAdditionalNodes, clientnode, chosen, true,
-            // TODO: get storage type from the file
-        excludes, preferredblocksize, StorageType.DEFAULT);
+    final DatanodeStorageInfo[] targets = blockManager.chooseTarget4AdditionalDatanode(
+        src, numAdditionalNodes, clientnode, chosen, 
+        excludes, preferredblocksize, storagePolicyID);
     final LocatedBlock lb = new LocatedBlock(blk, targets);
     blockManager.setBlockToken(lb, AccessMode.COPY);
     return lb;
@@ -3922,12 +3973,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       checkOperation(OperationCategory.READ);
       src = resolvePath(src, pathComponents);
+      boolean isSuperUser = true;
       if (isPermissionEnabled) {
         checkPermission(pc, src, false, null, null, null, null, false,
             resolveLink);
+        isSuperUser = pc.isSuperUser();
       }
       stat = dir.getFileInfo(src, resolveLink,
-          FSDirectory.isReservedRawName(srcArg));
+          FSDirectory.isReservedRawName(srcArg), isSuperUser);
     } catch (AccessControlException e) {
       logAuditEvent(false, "getfileinfo", srcArg);
       throw e;
@@ -4156,7 +4209,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   /**
    * Get the content summary for a specific file/dir.
    *
-   * @param src The string representation of the path to the file
+   * @param srcArg The string representation of the path to the file
    *
    * @throws AccessControlException if access is denied
    * @throws UnresolvedLinkException if a symlink is encountered.
@@ -4732,16 +4785,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
               "Can't find startAfter " + startAfterString);
         }
       }
-      
+
+      boolean isSuperUser = true;
       if (isPermissionEnabled) {
         if (dir.isDir(src)) {
           checkPathAccess(pc, src, FsAction.READ_EXECUTE);
         } else {
           checkTraverse(pc, src);
         }
+        isSuperUser = pc.isSuperUser();
       }
       logAuditEvent(true, "listStatus", srcArg);
-      dl = dir.getListing(src, startAfter, needLocation);
+      dl = dir.getListing(src, startAfter, needLocation, isSuperUser);
     } finally {
       readUnlock();
     }
@@ -4891,12 +4946,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
   /**
    * Add the given symbolic link to the fs. Record it in the edits log.
-   * @param path
-   * @param target
-   * @param dirPerms
-   * @param createParent
-   * @param logRetryCache
-   * @param dir
    */
   private INodeSymlink addSymlink(String path, String target,
                                   PermissionStatus dirPerms,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
index c346be9..307f507 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
@@ -684,6 +685,20 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
     return this;
   }
 
+  /**
+   * @return the latest block storage policy id of the INode. Specifically,
+   * if a storage policy is directly specified on the INode then return the ID
+   * of that policy. Otherwise follow the latest parental path and return the
+   * ID of the first specified storage policy.
+   */
+  public abstract byte getStoragePolicyID();
+
+  /**
+   * @return the storage policy directly specified on the INode. Return
+   * {@link BlockStoragePolicy#ID_UNSPECIFIED} if no policy has
+   * been specified.
+   */
+  public abstract byte getLocalStoragePolicyID();
 
   /**
    * Breaks {@code path} into components.
@@ -711,7 +726,7 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
    * @throws AssertionError if the given path is invalid.
    * @return array of path components.
    */
-  static String[] getPathNames(String path) {
+  public static String[] getPathNames(String path) {
     if (path == null || !path.startsWith(Path.SEPARATOR)) {
       throw new AssertionError("Absolute path required");
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
index 18b4109..f5579ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
@@ -26,7 +26,9 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.fs.PathIsNotDirectoryException;
+import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 
 /**
  * Directory INode class.
@@ -103,6 +106,30 @@ public class INodeDirectory extends INodeWithAdditionalFields
     return this;
   }
 
+  @Override
+  public byte getLocalStoragePolicyID() {
+    XAttrFeature f = getXAttrFeature();
+    ImmutableList<XAttr> xattrs = f == null ? ImmutableList.<XAttr> of() : f
+        .getXAttrs();
+    for (XAttr xattr : xattrs) {
+      if (BlockStoragePolicy.isStoragePolicyXAttr(xattr)) {
+        return (xattr.getValue())[0];
+      }
+    }
+    return BlockStoragePolicy.ID_UNSPECIFIED;
+  }
+
+  @Override
+  public byte getStoragePolicyID() {
+    byte id = getLocalStoragePolicyID();
+    if (id != BlockStoragePolicy.ID_UNSPECIFIED) {
+      return id;
+    }
+    // if it is unspecified, check its parent
+    return getParent() != null ? getParent().getStoragePolicyID() :
+        BlockStoragePolicy.ID_UNSPECIFIED;
+  }
+
   void setQuota(long nsQuota, long dsQuota) {
     DirectoryWithQuotaFeature quota = getDirectoryWithQuotaFeature();
     if (quota != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
index b1e7485..f0f58a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java
@@ -18,10 +18,12 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.hdfs.server.namenode.XAttrFeature;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 
 /**
  * The attributes of an inode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 94fa686..7af2b71 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -28,6 +28,7 @@ import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
@@ -71,10 +72,14 @@ public class INodeFile extends INodeWithAdditionalFields
     return inode.asFile();
   }
 
-  /** Format: [16 bits for replication][48 bits for PreferredBlockSize] */
+  /** 
+   * Bit format:
+   * [4-bit storagePolicyID][12-bit replication][48-bit preferredBlockSize]
+   */
   static enum HeaderFormat {
     PREFERRED_BLOCK_SIZE(null, 48, 1),
-    REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 16, 1);
+    REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 12, 1),
+    STORAGE_POLICY_ID(REPLICATION.BITS, BlockStoragePolicy.ID_BIT_LENGTH, 0);
 
     private final LongBitFormat BITS;
 
@@ -90,10 +95,16 @@ public class INodeFile extends INodeWithAdditionalFields
       return PREFERRED_BLOCK_SIZE.BITS.retrieve(header);
     }
 
-    static long toLong(long preferredBlockSize, short replication) {
+    static byte getStoragePolicyID(long header) {
+      return (byte)STORAGE_POLICY_ID.BITS.retrieve(header);
+    }
+
+    static long toLong(long preferredBlockSize, short replication,
+        byte storagePolicyID) {
       long h = 0;
       h = PREFERRED_BLOCK_SIZE.BITS.combine(preferredBlockSize, h);
       h = REPLICATION.BITS.combine(replication, h);
+      h = STORAGE_POLICY_ID.BITS.combine(storagePolicyID, h);
       return h;
     }
   }
@@ -104,9 +115,10 @@ public class INodeFile extends INodeWithAdditionalFields
 
   INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime,
       long atime, BlockInfo[] blklist, short replication,
-      long preferredBlockSize) {
+      long preferredBlockSize, byte storagePolicyID) {
     super(id, name, permissions, mtime, atime);
-    header = HeaderFormat.toLong(preferredBlockSize, replication);
+    header = HeaderFormat.toLong(preferredBlockSize, replication,
+        storagePolicyID);
     this.blocks = blklist;
   }
   
@@ -160,7 +172,6 @@ public class INodeFile extends INodeWithAdditionalFields
     return getFileUnderConstructionFeature() != null;
   }
 
-  /** Convert this file to an {@link INodeFileUnderConstruction}. */
   INodeFile toUnderConstruction(String clientName, String clientMachine) {
     Preconditions.checkState(!isUnderConstruction(),
         "file is already under construction");
@@ -356,6 +367,32 @@ public class INodeFile extends INodeWithAdditionalFields
   }
 
   @Override
+  public byte getLocalStoragePolicyID() {
+    return HeaderFormat.getStoragePolicyID(header);
+  }
+
+  @Override
+  public byte getStoragePolicyID() {
+    byte id = getLocalStoragePolicyID();
+    if (id == BlockStoragePolicy.ID_UNSPECIFIED) {
+      return this.getParent() != null ?
+          this.getParent().getStoragePolicyID() : id;
+    }
+    return id;
+  }
+
+  private void setStoragePolicyID(byte storagePolicyId) {
+    header = HeaderFormat.STORAGE_POLICY_ID.BITS.combine(storagePolicyId,
+        header);
+  }
+
+  public final void setStoragePolicyID(byte storagePolicyId,
+      int latestSnapshotId) throws QuotaExceededException {
+    recordModification(latestSnapshotId);
+    setStoragePolicyID(storagePolicyId);
+  }
+
+  @Override
   public long getHeaderLong() {
     return header;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
index 47b76b7..f9d2700 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
@@ -32,12 +32,14 @@ public interface INodeFileAttributes extends INodeAttributes {
 
   /** @return preferred block size in bytes */
   public long getPreferredBlockSize();
-  
+
   /** @return the header as a long. */
   public long getHeaderLong();
 
   public boolean metadataEquals(INodeFileAttributes other);
 
+  public byte getLocalStoragePolicyID();
+
   /** A copy of the inode file attributes */
   public static class SnapshotCopy extends INodeAttributes.SnapshotCopy
       implements INodeFileAttributes {
@@ -45,10 +47,11 @@ public interface INodeFileAttributes extends INodeAttributes {
 
     public SnapshotCopy(byte[] name, PermissionStatus permissions,
         AclFeature aclFeature, long modificationTime, long accessTime,
-        short replication, long preferredBlockSize, XAttrFeature xAttrsFeature) {
+        short replication, long preferredBlockSize, byte storagePolicyID,
+        XAttrFeature xAttrsFeature) {
       super(name, permissions, aclFeature, modificationTime, accessTime, 
           xAttrsFeature);
-      header = HeaderFormat.toLong(preferredBlockSize, replication);
+      header = HeaderFormat.toLong(preferredBlockSize, replication, storagePolicyID);
     }
 
     public SnapshotCopy(INodeFile file) {
@@ -67,6 +70,11 @@ public interface INodeFileAttributes extends INodeAttributes {
     }
 
     @Override
+    public byte getLocalStoragePolicyID() {
+      return HeaderFormat.getStoragePolicyID(header);
+    }
+
+    @Override
     public long getHeaderLong() {
       return header;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
index 02c0815..87e4715 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.namenode.Quota.Counts;
 import org.apache.hadoop.util.GSet;
@@ -121,6 +122,16 @@ public class INodeMap {
           boolean countDiffChange) throws QuotaExceededException {
         return null;
       }
+
+      @Override
+      public byte getStoragePolicyID(){
+        return BlockStoragePolicy.ID_UNSPECIFIED;
+      }
+
+      @Override
+      public byte getLocalStoragePolicyID() {
+        return BlockStoragePolicy.ID_UNSPECIFIED;
+      }
     };
       
     return map.get(inode);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
index 9bd2ad0..a4766d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
@@ -286,6 +286,16 @@ public abstract class INodeReference extends INode {
   }
 
   @Override
+  public final byte getStoragePolicyID() {
+    return referred.getStoragePolicyID();
+  }
+
+  @Override
+  public final byte getLocalStoragePolicyID() {
+    return referred.getLocalStoragePolicyID();
+  }
+
+  @Override
   final void recordModification(int latestSnapshotId)
       throws QuotaExceededException {
     referred.recordModification(latestSnapshotId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
index 6729cd2..45a4bc8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
@@ -145,4 +145,16 @@ public class INodeSymlink extends INodeWithAdditionalFields {
   public void addXAttrFeature(XAttrFeature f) {
     throw new UnsupportedOperationException("XAttrs are not supported on symlinks");
   }
+
+  @Override
+  public byte getStoragePolicyID() {
+    throw new UnsupportedOperationException(
+        "Storage policy are not supported on symlinks");
+  }
+
+  @Override
+  public byte getLocalStoragePolicyID() {
+    throw new UnsupportedOperationException(
+        "Storage policy are not supported on symlinks");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
index 404e205..512913b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
@@ -68,7 +68,8 @@ public class NameNodeLayoutVersion {
     XATTRS(-57, "Extended attributes"),
     CREATE_OVERWRITE(-58, "Use single editlog record for " +
       "creating file with overwrite"),
-    XATTRS_NAMESPACE_EXT(-59, "Increase number of xattr namespaces");
+    XATTRS_NAMESPACE_EXT(-59, "Increase number of xattr namespaces"),
+    BLOCK_STORAGE_POLICY(-60, "Block Storage policy");
     
     private final FeatureInfo info;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 25cf27f..006c803 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -580,7 +580,13 @@ class NameNodeRpcServer implements NamenodeProtocols {
     throws IOException {  
     return namesystem.setReplication(src, replication);
   }
-    
+
+  @Override
+  public void setStoragePolicy(String src, String policyName)
+      throws IOException {
+    namesystem.setStoragePolicy(src, policyName);
+  }
+
   @Override // ClientProtocol
   public void setPermission(String src, FsPermission permissions)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
index 3f4cda5..ff33225 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
@@ -220,7 +220,8 @@ public class FSImageFormatPBSnapshot {
           copy = new INodeFileAttributes.SnapshotCopy(pbf.getName()
               .toByteArray(), permission, acl, fileInPb.getModificationTime(),
               fileInPb.getAccessTime(), (short) fileInPb.getReplication(),
-              fileInPb.getPreferredBlockSize(), xAttrs);
+              fileInPb.getPreferredBlockSize(),
+              (byte)fileInPb.getStoragePolicyID(), xAttrs);
         }
 
         FileDiff diff = new FileDiff(pbf.getSnapshotId(), copy, null,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
index 3949fbd..f8c0fc2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
@@ -26,7 +26,6 @@ import java.net.InetAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
@@ -223,11 +222,8 @@ public class NamenodeWebHdfsMethods {
       final DatanodeDescriptor clientNode = bm.getDatanodeManager(
           ).getDatanodeByHost(getRemoteAddress());
       if (clientNode != null) {
-        final DatanodeStorageInfo[] storages = bm.getBlockPlacementPolicy()
-            .chooseTarget(path, 1, clientNode,
-                new ArrayList<DatanodeStorageInfo>(), false, excludes, blocksize,
-                // TODO: get storage type from the file
-                StorageType.DEFAULT);
+        final DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS(
+            path, clientNode, excludes, blocksize);
         if (storages.length > 0) {
           return storages[0].getDatanodeDescriptor();
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
index 580d9c5..6164ee9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.tools;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
@@ -48,6 +49,7 @@ import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.shell.Command;
 import org.apache.hadoop.fs.shell.CommandFormat;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -63,23 +65,24 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.GenericRefreshProtocol;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
-import org.apache.hadoop.ipc.GenericRefreshProtocol;
 import org.apache.hadoop.ipc.RefreshResponse;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
-import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolClientSideTranslatorPB;
-import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -390,6 +393,8 @@ public class DFSAdmin extends FsShell {
     "\t[-shutdownDatanode <datanode_host:ipc_port> [upgrade]]\n" +
     "\t[-getDatanodeInfo <datanode_host:ipc_port>]\n" +
     "\t[-metasave filename]\n" +
+    "\t[-setStoragePolicy path policyName]\n" +
+    "\t[-getStoragePolicy path]\n" +
     "\t[-help [cmd]]\n";
 
   /**
@@ -595,6 +600,32 @@ public class DFSAdmin extends FsShell {
     return inSafeMode;
   }
 
+  public int setStoragePolicy(String[] argv) throws IOException {
+    DistributedFileSystem dfs = getDFS();
+    dfs.setStoragePolicy(new Path(argv[1]), argv[2]);
+    System.out.println("Set storage policy " + argv[2] + " on " + argv[1]);
+    return 0;
+  }
+
+  public int getStoragePolicy(String[] argv) throws IOException {
+    DistributedFileSystem dfs = getDFS();
+    HdfsFileStatus status = dfs.getClient().getFileInfo(argv[1]);
+    if (status == null) {
+      throw new FileNotFoundException("File/Directory does not exist: "
+          + argv[1]);
+    }
+    byte storagePolicyId = status.getStoragePolicy();
+    BlockStoragePolicy.Suite suite = BlockStoragePolicy
+        .readBlockStorageSuite(getConf());
+    BlockStoragePolicy policy = suite.getPolicy(storagePolicyId);
+    if (policy != null) {
+      System.out.println("The storage policy of " + argv[1] + ":\n" + policy);
+      return 0;
+    } else {
+      throw new IOException("Cannot identify the storage policy for " + argv[1]);
+    }
+  }
+
   /**
    * Allow snapshot on a directory.
    * Usage: java DFSAdmin -allowSnapshot snapshotDir
@@ -941,7 +972,13 @@ public class DFSAdmin extends FsShell {
     String getDatanodeInfo = "-getDatanodeInfo <datanode_host:ipc_port>\n"
         + "\tGet the information about the given datanode. This command can\n"
         + "\tbe used for checking if a datanode is alive.\n";
-    
+
+    String setStoragePolicy = "-setStoragePolicy path policyName\n"
+        + "\tSet the storage policy for a file/directory.\n";
+
+    String getStoragePolicy = "-getStoragePolicy path\n"
+        + "\tGet the storage policy for a file/directory.\n";
+
     String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
       "\t\tis specified.\n";
 
@@ -1001,6 +1038,10 @@ public class DFSAdmin extends FsShell {
       System.out.println(shutdownDatanode);
     } else if ("getDatanodeInfo".equalsIgnoreCase(cmd)) {
       System.out.println(getDatanodeInfo);
+    } else if ("setStoragePolicy".equalsIgnoreCase(cmd))  {
+      System.out.println(setStoragePolicy);
+    } else if ("getStoragePolicy".equalsIgnoreCase(cmd))  {
+      System.out.println(getStoragePolicy);
     } else if ("help".equals(cmd)) {
       System.out.println(help);
     } else {
@@ -1033,6 +1074,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(disallowSnapshot);
       System.out.println(shutdownDatanode);
       System.out.println(getDatanodeInfo);
+      System.out.println(setStoragePolicy);
+      System.out.println(getStoragePolicy);
       System.out.println(help);
       System.out.println();
       ToolRunner.printGenericCommandUsage(System.out);
@@ -1461,6 +1504,12 @@ public class DFSAdmin extends FsShell {
     } else if ("-safemode".equals(cmd)) {
       System.err.println("Usage: hdfs dfsadmin"
           + " [-safemode enter | leave | get | wait]");
+    } else if ("-setStoragePolicy".equals(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+          + " [-setStoragePolicy path policyName]");
+    } else if ("-getStoragePolicy".equals(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+          + " [-getStoragePolicy path]");
     } else if ("-allowSnapshot".equalsIgnoreCase(cmd)) {
       System.err.println("Usage: hdfs dfsadmin"
           + " [-allowSnapshot <snapshotDir>]");
@@ -1677,6 +1726,16 @@ public class DFSAdmin extends FsShell {
         printUsage(cmd);
         return exitCode;
       }
+    } else if ("-setStoragePolicy".equals(cmd)) {
+      if (argv.length != 3) {
+        printUsage(cmd);
+        return exitCode;
+      }
+    } else if ("-getStoragePolicy".equals(cmd)) {
+      if (argv.length != 2) {
+        printUsage(cmd);
+        return exitCode;
+      }
     }
     
     // initialize DFSAdmin
@@ -1750,6 +1809,10 @@ public class DFSAdmin extends FsShell {
         exitCode = getDatanodeInfo(argv, i);
       } else if ("-reconfig".equals(cmd)) {
         exitCode = reconfig(argv, i);
+      } else if ("-setStoragePolicy".equals(cmd)) {
+        exitCode = setStoragePolicy(argv);
+      } else if ("-getStoragePolicy".equals(cmd)) {
+        exitCode = getStoragePolicy(argv);
       } else if ("-help".equals(cmd)) {
         if (i < argv.length) {
           printHelp(argv[i]);


Mime
View raw message