hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [37/39] hadoop git commit: HDFS-13215. RBF: Move Router to its own module. Contributed by Wei Yan
Date Tue, 20 Mar 2018 05:19:03 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FileSubclusterResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FileSubclusterResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FileSubclusterResolver.java
new file mode 100644
index 0000000..af9f493
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FileSubclusterResolver.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Interface to map a file path in the global name space to a specific
+ * subcluster and path in an HDFS name space.
+ * <p>
+ * Each path in the global/federated namespace may map to 1-N different HDFS
+ * locations.  Each location specifies a single nameservice and a single HDFS
+ * path.  The behavior is similar to MergeFS and Nfly and allows the merger
+ * of multiple HDFS locations into a single path.  See HADOOP-8298 and
+ * HADOOP-12077
+ * <p>
+ * For example, a directory listing will fetch listings for each destination
+ * path and combine them into a single set of results.
+ * <p>
+ * When multiple destinations are available for a path, the destinations are
+ * prioritized in a consistent manner.  This allows the proxy server to
+ * guess the best/most likely destination and attempt it first.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface FileSubclusterResolver {
+
+  /**
+   * Get the destinations for a global path. Results are from the mount table
+   * cache.  If multiple destinations are available, the first result is the
+   * highest priority destination.
+   *
+   * @param path Global path.
+   * @return Location in a destination namespace or null if it does not exist.
+   * @throws IOException Throws exception if the data is not available.
+   */
+  PathLocation getDestinationForPath(String path) throws IOException;
+
+  /**
+   * Get a list of mount points for a path. Results are from the mount table
+   * cache.
+   *
+   * @return List of mount points present at this path or zero-length list if
+   *         none are found.
+   * @throws IOException Throws exception if the data is not available.
+   */
+  List<String> getMountPoints(String path) throws IOException;
+
+  /**
+   * Get the default namespace for the cluster.
+   *
+   * @return Default namespace identifier.
+   */
+  String getDefaultNamespace();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
new file mode 100644
index 0000000..b87eeec
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.ACTIVE;
+import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.EXPIRED;
+import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.UNAVAILABLE;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements a cached lookup of the most recently active namenode for a
+ * particular nameservice. Relies on the {@link StateStoreService} to
+ * discover available nameservices and namenodes.
+ */
+public class MembershipNamenodeResolver
+    implements ActiveNamenodeResolver, StateStoreCache {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MembershipNamenodeResolver.class);
+
+  /** Reference to the State Store. */
+  private final StateStoreService stateStore;
+  /** Membership State Store interface. */
+  private MembershipStore membershipInterface;
+
+  /** Parent router ID. */
+  private String routerId;
+
+  /** Cached lookup of NN for nameservice. Invalidated on cache refresh. */
+  private Map<String, List<? extends FederationNamenodeContext>> cacheNS;
+  /** Cached lookup of NN for block pool. Invalidated on cache refresh. */
+  private Map<String, List<? extends FederationNamenodeContext>> cacheBP;
+
+
+  public MembershipNamenodeResolver(
+      Configuration conf, StateStoreService store) throws IOException {
+    this.stateStore = store;
+
+    this.cacheNS = new ConcurrentHashMap<>();
+    this.cacheBP = new ConcurrentHashMap<>();
+
+    if (this.stateStore != null) {
+      // Request cache updates from the state store
+      this.stateStore.registerCacheExternal(this);
+    }
+  }
+
+  private synchronized MembershipStore getMembershipStore() throws IOException {
+    if (this.membershipInterface == null) {
+      this.membershipInterface = this.stateStore.getRegisteredRecordStore(
+          MembershipStore.class);
+      if (this.membershipInterface == null) {
+        throw new IOException("State Store does not have an interface for " +
+            MembershipStore.class.getSimpleName());
+      }
+    }
+    return this.membershipInterface;
+  }
+
+  @Override
+  public boolean loadCache(boolean force) {
+    // Our cache depends on the store, update it first
+    try {
+      MembershipStore membership = getMembershipStore();
+      membership.loadCache(force);
+    } catch (IOException e) {
+      LOG.error("Cannot update membership from the State Store", e);
+    }
+
+    // Force refresh of active NN cache
+    cacheBP.clear();
+    cacheNS.clear();
+    return true;
+  }
+
+  @Override
+  public void updateActiveNamenode(
+      final String nsId, final InetSocketAddress address) throws IOException {
+
+    // Called when we have an RPC miss and successful hit on an alternate NN.
+    // Temporarily update our cache, it will be overwritten on the next update.
+    try {
+      MembershipState partial = MembershipState.newInstance();
+      String rpcAddress = address.getHostName() + ":" + address.getPort();
+      partial.setRpcAddress(rpcAddress);
+      partial.setNameserviceId(nsId);
+
+      GetNamenodeRegistrationsRequest request =
+          GetNamenodeRegistrationsRequest.newInstance(partial);
+
+      MembershipStore membership = getMembershipStore();
+      GetNamenodeRegistrationsResponse response =
+          membership.getNamenodeRegistrations(request);
+      List<MembershipState> records = response.getNamenodeMemberships();
+
+      if (records != null && records.size() == 1) {
+        MembershipState record = records.get(0);
+        UpdateNamenodeRegistrationRequest updateRequest =
+            UpdateNamenodeRegistrationRequest.newInstance(
+                record.getNameserviceId(), record.getNamenodeId(), ACTIVE);
+        membership.updateNamenodeRegistration(updateRequest);
+      }
+    } catch (StateStoreUnavailableException e) {
+      LOG.error("Cannot update {} as active, State Store unavailable", address);
+    }
+  }
+
+  @Override
+  public List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
+      final String nsId) throws IOException {
+
+    List<? extends FederationNamenodeContext> ret = cacheNS.get(nsId);
+    if (ret == null) {
+      try {
+        MembershipState partial = MembershipState.newInstance();
+        partial.setNameserviceId(nsId);
+        GetNamenodeRegistrationsRequest request =
+            GetNamenodeRegistrationsRequest.newInstance(partial);
+
+        final List<MembershipState> result =
+            getRecentRegistrationForQuery(request, true, false);
+        if (result == null || result.isEmpty()) {
+          LOG.error("Cannot locate eligible NNs for {}", nsId);
+          return null;
+        } else {
+          cacheNS.put(nsId, result);
+          ret = result;
+        }
+      } catch (StateStoreUnavailableException e) {
+        LOG.error("Cannot get active NN for {}, State Store unavailable", nsId);
+      }
+    }
+    if (ret == null) {
+      return null;
+    }
+    return Collections.unmodifiableList(ret);
+  }
+
+  @Override
+  public List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId(
+      final String bpId) throws IOException {
+
+    List<? extends FederationNamenodeContext> ret = cacheBP.get(bpId);
+    if (ret == null) {
+      try {
+        MembershipState partial = MembershipState.newInstance();
+        partial.setBlockPoolId(bpId);
+        GetNamenodeRegistrationsRequest request =
+            GetNamenodeRegistrationsRequest.newInstance(partial);
+
+        final List<MembershipState> result =
+            getRecentRegistrationForQuery(request, true, false);
+        if (result == null || result.isEmpty()) {
+          LOG.error("Cannot locate eligible NNs for {}", bpId);
+        } else {
+          cacheBP.put(bpId, result);
+          ret = result;
+        }
+      } catch (StateStoreUnavailableException e) {
+        LOG.error("Cannot get active NN for {}, State Store unavailable", bpId);
+        return null;
+      }
+    }
+    if (ret == null) {
+      return null;
+    }
+    return Collections.unmodifiableList(ret);
+  }
+
+  @Override
+  public boolean registerNamenode(NamenodeStatusReport report)
+      throws IOException {
+
+    if (this.routerId == null) {
+      LOG.warn("Cannot register namenode, router ID is not known {}", report);
+      return false;
+    }
+
+    MembershipState record = MembershipState.newInstance(
+        routerId, report.getNameserviceId(), report.getNamenodeId(),
+        report.getClusterId(), report.getBlockPoolId(), report.getRpcAddress(),
+        report.getServiceAddress(), report.getLifelineAddress(),
+        report.getWebAddress(), report.getState(), report.getSafemode());
+
+    if (report.statsValid()) {
+      MembershipStats stats = MembershipStats.newInstance();
+      stats.setNumOfFiles(report.getNumFiles());
+      stats.setNumOfBlocks(report.getNumBlocks());
+      stats.setNumOfBlocksMissing(report.getNumBlocksMissing());
+      stats.setNumOfBlocksPendingReplication(
+          report.getNumOfBlocksPendingReplication());
+      stats.setNumOfBlocksUnderReplicated(
+          report.getNumOfBlocksUnderReplicated());
+      stats.setNumOfBlocksPendingDeletion(
+          report.getNumOfBlocksPendingDeletion());
+      stats.setAvailableSpace(report.getAvailableSpace());
+      stats.setTotalSpace(report.getTotalSpace());
+      stats.setProvidedSpace(report.getProvidedSpace());
+      stats.setNumOfDecommissioningDatanodes(
+          report.getNumDecommissioningDatanodes());
+      stats.setNumOfActiveDatanodes(report.getNumLiveDatanodes());
+      stats.setNumOfDeadDatanodes(report.getNumDeadDatanodes());
+      stats.setNumOfDecomActiveDatanodes(report.getNumDecomLiveDatanodes());
+      stats.setNumOfDecomDeadDatanodes(report.getNumDecomDeadDatanodes());
+      record.setStats(stats);
+    }
+
+    if (report.getState() != UNAVAILABLE) {
+      // Set/update our last contact time
+      record.setLastContact(Time.now());
+    }
+
+    NamenodeHeartbeatRequest request = NamenodeHeartbeatRequest.newInstance();
+    request.setNamenodeMembership(record);
+    return getMembershipStore().namenodeHeartbeat(request).getResult();
+  }
+
+  @Override
+  public Set<FederationNamespaceInfo> getNamespaces() throws IOException {
+    GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
+    GetNamespaceInfoResponse response =
+        getMembershipStore().getNamespaceInfo(request);
+    return response.getNamespaceInfo();
+  }
+
+  /**
+   * Picks the most relevant record registration that matches the query. Return
+   * registrations matching the query in this preference: 1) Most recently
+   * updated ACTIVE registration 2) Most recently updated STANDBY registration
+   * (if showStandby) 3) Most recently updated UNAVAILABLE registration (if
+   * showUnavailable). EXPIRED registrations are ignored.
+   *
+   * @param request The select query for NN registrations.
+   * @param addUnavailable include UNAVAILABLE registrations.
+   * @param addExpired include EXPIRED registrations.
+   * @return List of memberships or null if no registrations that
+   *         both match the query AND the selected states.
+   * @throws IOException
+   */
+  private List<MembershipState> getRecentRegistrationForQuery(
+      GetNamenodeRegistrationsRequest request, boolean addUnavailable,
+      boolean addExpired) throws IOException {
+
+    // Retrieve a list of all registrations that match this query.
+    // This may include all NN records for a namespace/blockpool, including
+    // duplicate records for the same NN from different routers.
+    MembershipStore membershipStore = getMembershipStore();
+    GetNamenodeRegistrationsResponse response =
+        membershipStore.getNamenodeRegistrations(request);
+
+    List<MembershipState> memberships = response.getNamenodeMemberships();
+    if (!addExpired || !addUnavailable) {
+      Iterator<MembershipState> iterator = memberships.iterator();
+      while (iterator.hasNext()) {
+        MembershipState membership = iterator.next();
+        if (membership.getState() == EXPIRED && !addExpired) {
+          iterator.remove();
+        } else if (membership.getState() == UNAVAILABLE && !addUnavailable) {
+          iterator.remove();
+        }
+      }
+    }
+
+    List<MembershipState> priorityList = new ArrayList<>();
+    priorityList.addAll(memberships);
+    Collections.sort(priorityList, new NamenodePriorityComparator());
+
+    LOG.debug("Selected most recent NN {} for query", priorityList);
+    return priorityList;
+  }
+
+  @Override
+  public void setRouterId(String router) {
+    this.routerId = router;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java
new file mode 100644
index 0000000..c2e4a5b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
+
+/**
+ * Manage a mount table.
+ */
+public interface MountTableManager {
+
+  /**
+   * Add an entry to the mount table.
+   *
+   * @param request Fully populated request object.
+   * @return True if the mount table entry was successfully committed to the
+   *         data store.
+   * @throws IOException Throws exception if the data store is not initialized.
+   */
+  AddMountTableEntryResponse addMountTableEntry(
+      AddMountTableEntryRequest request) throws IOException;
+
+  /**
+   * Updates an existing entry in the mount table.
+   *
+   * @param request Fully populated request object.
+   * @return True if the mount table entry was successfully committed to the
+   *         data store.
+   * @throws IOException Throws exception if the data store is not initialized.
+   */
+  UpdateMountTableEntryResponse updateMountTableEntry(
+      UpdateMountTableEntryRequest request) throws IOException;
+
+  /**
+   * Remove an entry from the mount table.
+   *
+   * @param request Fully populated request object.
+   * @return True the mount table entry was removed from the data store.
+   * @throws IOException Throws exception if the data store is not initialized.
+   */
+  RemoveMountTableEntryResponse removeMountTableEntry(
+      RemoveMountTableEntryRequest request) throws IOException;
+
+  /**
+   * List all mount table entries present at or below the path. Fetches from the
+   * state store.
+   *
+   * @param request Fully populated request object.
+   *
+   * @return List of all mount table entries under the path. Zero-length list if
+   *         none are found.
+   * @throws IOException Throws exception if the data store cannot be queried.
+   */
+  GetMountTableEntriesResponse getMountTableEntries(
+      GetMountTableEntriesRequest request) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java
new file mode 100644
index 0000000..9713138
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java
@@ -0,0 +1,597 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.isParentEntry;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * Mount table to map between global paths and remote locations. This allows the
+ * {@link org.apache.hadoop.hdfs.server.federation.router.Router Router} to map
+ * the global HDFS view to the remote namespaces. This is similar to
+ * {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}.
+ * This is implemented as a tree.
+ */
+public class MountTableResolver
+    implements FileSubclusterResolver, StateStoreCache {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MountTableResolver.class);
+
+  /** Reference to Router. */
+  private final Router router;
+  /** Reference to the State Store. */
+  private final StateStoreService stateStore;
+  /** Interface to the mount table store. */
+  private MountTableStore mountTableStore;
+
+  /** If the tree has been initialized. */
+  private boolean init = false;
+  /** Path -> Remote HDFS location. */
+  private final TreeMap<String, MountTable> tree = new TreeMap<>();
+  /** Path -> Remote location. */
+  private final Cache<String, PathLocation> locationCache;
+
+  /** Default nameservice when no mount matches the math. */
+  private String defaultNameService = "";
+
+  /** Synchronization for both the tree and the cache. */
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
+
+  @VisibleForTesting
+  public MountTableResolver(Configuration conf) {
+    this(conf, (StateStoreService)null);
+  }
+
+  public MountTableResolver(Configuration conf, Router routerService) {
+    this(conf, routerService, null);
+  }
+
+  public MountTableResolver(Configuration conf, StateStoreService store) {
+    this(conf, null, store);
+  }
+
+  public MountTableResolver(Configuration conf, Router routerService,
+      StateStoreService store) {
+    this.router = routerService;
+    if (store != null) {
+      this.stateStore = store;
+    } else if (this.router != null) {
+      this.stateStore = this.router.getStateStore();
+    } else {
+      this.stateStore = null;
+    }
+
+    int maxCacheSize = conf.getInt(
+        FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE,
+        FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE_DEFAULT);
+    this.locationCache = CacheBuilder.newBuilder()
+        .maximumSize(maxCacheSize)
+        .build();
+
+    registerCacheExternal();
+    initDefaultNameService(conf);
+  }
+
+  /**
+   * Request cache updates from the State Store for this resolver.
+   */
+  private void registerCacheExternal() {
+    if (this.stateStore != null) {
+      this.stateStore.registerCacheExternal(this);
+    }
+  }
+
+  /**
+   * Nameservice for APIs that cannot be resolved to a specific one.
+   *
+   * @param conf Configuration for this resolver.
+   */
+  private void initDefaultNameService(Configuration conf) {
+    try {
+      this.defaultNameService = conf.get(
+          DFS_ROUTER_DEFAULT_NAMESERVICE,
+          DFSUtil.getNamenodeNameServiceId(conf));
+    } catch (HadoopIllegalArgumentException e) {
+      LOG.error("Cannot find default name service, setting it to the first");
+      Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
+      this.defaultNameService = nsIds.iterator().next();
+      LOG.info("Default name service: {}", this.defaultNameService);
+    }
+  }
+
+  /**
+   * Get a reference for the Router for this resolver.
+   *
+   * @return Router for this resolver.
+   */
+  protected Router getRouter() {
+    return this.router;
+  }
+
+  /**
+   * Get the mount table store for this resolver.
+   *
+   * @return Mount table store.
+   * @throws IOException If it cannot connect to the State Store.
+   */
+  protected MountTableStore getMountTableStore() throws IOException {
+    if (this.mountTableStore == null) {
+      this.mountTableStore = this.stateStore.getRegisteredRecordStore(
+          MountTableStore.class);
+      if (this.mountTableStore == null) {
+        throw new IOException("State Store does not have an interface for " +
+            MountTableStore.class);
+      }
+    }
+    return this.mountTableStore;
+  }
+
+  /**
+   * Add a mount entry to the table.
+   *
+   * @param entry The mount table record to add from the state store.
+   */
+  public void addEntry(final MountTable entry) {
+    writeLock.lock();
+    try {
+      String srcPath = entry.getSourcePath();
+      this.tree.put(srcPath, entry);
+      invalidateLocationCache(srcPath);
+    } finally {
+      writeLock.unlock();
+    }
+    this.init = true;
+  }
+
+  /**
+   * Remove a mount table entry.
+   *
+   * @param srcPath Source path for the entry to remove.
+   */
+  public void removeEntry(final String srcPath) {
+    writeLock.lock();
+    try {
+      this.tree.remove(srcPath);
+      invalidateLocationCache(srcPath);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Invalidates all cache entries below this path. It requires the write lock.
+   *
+   * @param path Source path.
+   */
+  private void invalidateLocationCache(final String path) {
+    LOG.debug("Invalidating {} from {}", path, locationCache);
+    if (locationCache.size() == 0) {
+      return;
+    }
+
+    // Go through the entries and remove the ones from the path to invalidate
+    ConcurrentMap<String, PathLocation> map = locationCache.asMap();
+    Set<Entry<String, PathLocation>> entries = map.entrySet();
+    Iterator<Entry<String, PathLocation>> it = entries.iterator();
+    while (it.hasNext()) {
+      Entry<String, PathLocation> entry = it.next();
+      PathLocation loc = entry.getValue();
+      String src = loc.getSourcePath();
+      if (src != null) {
+        if (isParentEntry(src, path)) {
+          LOG.debug("Removing {}", src);
+          it.remove();
+        }
+      } else {
+        String dest = loc.getDefaultLocation().getDest();
+        if (dest.startsWith(path)) {
+          LOG.debug("Removing default cache {}", dest);
+          it.remove();
+        }
+      }
+    }
+
+    LOG.debug("Location cache after invalidation: {}", locationCache);
+  }
+
+  /**
+   * Updates the mount path tree with a new set of mount table entries. It also
+   * updates the needed caches.
+   *
+   * @param entries Full set of mount table entries to update.
+   */
+  @VisibleForTesting
+  public void refreshEntries(final Collection<MountTable> entries) {
+    // The tree read/write must be atomic
+    writeLock.lock();
+    try {
+      // New entries
+      Map<String, MountTable> newEntries = new ConcurrentHashMap<>();
+      for (MountTable entry : entries) {
+        String srcPath = entry.getSourcePath();
+        newEntries.put(srcPath, entry);
+      }
+
+      // Old entries (reversed to sort from the leaves to the root)
+      Set<String> oldEntries = new TreeSet<>(Collections.reverseOrder());
+      for (MountTable entry : getTreeValues("/")) {
+        String srcPath = entry.getSourcePath();
+        oldEntries.add(srcPath);
+      }
+
+      // Entries that need to be removed
+      for (String srcPath : oldEntries) {
+        if (!newEntries.containsKey(srcPath)) {
+          this.tree.remove(srcPath);
+          invalidateLocationCache(srcPath);
+          LOG.info("Removed stale mount point {} from resolver", srcPath);
+        }
+      }
+
+      // Entries that need to be added
+      for (MountTable entry : entries) {
+        String srcPath = entry.getSourcePath();
+        if (!oldEntries.contains(srcPath)) {
+          // Add node, it does not exist
+          this.tree.put(srcPath, entry);
+          invalidateLocationCache(srcPath);
+          LOG.info("Added new mount point {} to resolver", srcPath);
+        } else {
+          // Node exists, check for updates
+          MountTable existingEntry = this.tree.get(srcPath);
+          if (existingEntry != null && !existingEntry.equals(entry)) {
+            LOG.info("Entry has changed from \"{}\" to \"{}\"",
+                existingEntry, entry);
+            this.tree.put(srcPath, entry);
+            invalidateLocationCache(srcPath);
+            LOG.info("Updated mount point {} in resolver", srcPath);
+          }
+        }
+      }
+    } finally {
+      writeLock.unlock();
+    }
+    this.init = true;
+  }
+
+  /**
+   * Replaces the current in-memory cached of the mount table with a new
+   * version fetched from the data store.
+   */
+  @Override
+  public boolean loadCache(boolean force) {
+    try {
+      // Our cache depends on the store, update it first
+      MountTableStore mountTable = this.getMountTableStore();
+      mountTable.loadCache(force);
+
+      GetMountTableEntriesRequest request =
+          GetMountTableEntriesRequest.newInstance("/");
+      GetMountTableEntriesResponse response =
+          mountTable.getMountTableEntries(request);
+      List<MountTable> records = response.getEntries();
+      refreshEntries(records);
+    } catch (IOException e) {
+      LOG.error("Cannot fetch mount table entries from State Store", e);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Clears all data.
+   */
+  public void clear() {
+    LOG.info("Clearing all mount location caches");
+    writeLock.lock();
+    try {
+      this.locationCache.invalidateAll();
+      this.tree.clear();
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public PathLocation getDestinationForPath(final String path)
+      throws IOException {
+    verifyMountTable();
+    readLock.lock();
+    try {
+      Callable<? extends PathLocation> meh = new Callable<PathLocation>() {
+        @Override
+        public PathLocation call() throws Exception {
+          return lookupLocation(path);
+        }
+      };
+      return this.locationCache.get(path, meh);
+    } catch (ExecutionException e) {
+      throw new IOException(e);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Build the path location to insert into the cache atomically. It must hold
+   * the read lock.
+   * @param path Path to check/insert.
+   * @return New remote location.
+   */
+  public PathLocation lookupLocation(final String path) {
+    PathLocation ret = null;
+    MountTable entry = findDeepest(path);
+    if (entry != null) {
+      ret = buildLocation(path, entry);
+    } else {
+      // Not found, use default location
+      RemoteLocation remoteLocation =
+          new RemoteLocation(defaultNameService, path);
+      List<RemoteLocation> locations =
+          Collections.singletonList(remoteLocation);
+      ret = new PathLocation(null, locations);
+    }
+    return ret;
+  }
+
+  /**
+   * Get the mount table entry for a path.
+   *
+   * @param path Path to look for.
+   * @return Mount table entry the path belongs.
+   * @throws IOException If the State Store could not be reached.
+   */
+  public MountTable getMountPoint(final String path) throws IOException {
+    verifyMountTable();
+    return findDeepest(path);
+  }
+
+  @Override
+  public List<String> getMountPoints(final String path) throws IOException {
+    verifyMountTable();
+
+    Set<String> children = new TreeSet<>();
+    readLock.lock();
+    try {
+      String from = path;
+      String to = path + Character.MAX_VALUE;
+      SortedMap<String, MountTable> subMap = this.tree.subMap(from, to);
+
+      boolean exists = false;
+      for (String subPath : subMap.keySet()) {
+        String child = subPath;
+
+        // Special case for /
+        if (!path.equals(Path.SEPARATOR)) {
+          // Get the children
+          int ini = path.length();
+          child = subPath.substring(ini);
+        }
+
+        if (child.isEmpty()) {
+          // This is a mount point but without children
+          exists = true;
+        } else if (child.startsWith(Path.SEPARATOR)) {
+          // This is a mount point with children
+          exists = true;
+          child = child.substring(1);
+
+          // We only return immediate children
+          int fin = child.indexOf(Path.SEPARATOR);
+          if (fin > -1) {
+            child = child.substring(0, fin);
+          }
+          if (!child.isEmpty()) {
+            children.add(child);
+          }
+        }
+      }
+      if (!exists) {
+        return null;
+      }
+      return new LinkedList<>(children);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Get all the mount records at or beneath a given path.
+   * @param path Path to get the mount points from.
+   * @return List of mount table records under the path or null if the path is
+   *         not found.
+   * @throws IOException If it's not connected to the State Store.
+   */
+  public List<MountTable> getMounts(final String path) throws IOException {
+    verifyMountTable();
+
+    return getTreeValues(path, false);
+  }
+
+  /**
+   * Check if the Mount Table is ready to be used.
+   * @throws StateStoreUnavailableException If it cannot connect to the store.
+   */
+  private void verifyMountTable() throws StateStoreUnavailableException {
+    if (!this.init) {
+      throw new StateStoreUnavailableException("Mount Table not initialized");
+    }
+  }
+
+  @Override
+  public String toString() {
+    readLock.lock();
+    try {
+      return this.tree.toString();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Build a location for this result beneath the discovered mount point.
+   *
+   * @param path Path to build for.
+   * @param entry Mount table entry.
+   * @return PathLocation containing the namespace, local path.
+   */
+  private static PathLocation buildLocation(
+      final String path, final MountTable entry) {
+
+    String srcPath = entry.getSourcePath();
+    if (!path.startsWith(srcPath)) {
+      LOG.error("Cannot build location, {} not a child of {}", path, srcPath);
+      return null;
+    }
+    String remainingPath = path.substring(srcPath.length());
+    if (remainingPath.startsWith(Path.SEPARATOR)) {
+      remainingPath = remainingPath.substring(1);
+    }
+
+    List<RemoteLocation> locations = new LinkedList<>();
+    for (RemoteLocation oneDst : entry.getDestinations()) {
+      String nsId = oneDst.getNameserviceId();
+      String dest = oneDst.getDest();
+      String newPath = dest;
+      if (!newPath.endsWith(Path.SEPARATOR) && !remainingPath.isEmpty()) {
+        newPath += Path.SEPARATOR;
+      }
+      newPath += remainingPath;
+      RemoteLocation remoteLocation = new RemoteLocation(nsId, newPath);
+      locations.add(remoteLocation);
+    }
+    DestinationOrder order = entry.getDestOrder();
+    return new PathLocation(srcPath, locations, order);
+  }
+
+  @Override
+  public String getDefaultNamespace() {
+    return this.defaultNameService;
+  }
+
+  /**
+   * Find the deepest mount point for a path.
+   * @param path Path to look for.
+   * @return Mount table entry.
+   */
+  private MountTable findDeepest(final String path) {
+    readLock.lock();
+    try {
+      Entry<String, MountTable> entry = this.tree.floorEntry(path);
+      while (entry != null && !isParentEntry(path, entry.getKey())) {
+        entry = this.tree.lowerEntry(entry.getKey());
+      }
+      if (entry == null) {
+        return null;
+      }
+      return entry.getValue();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Get the mount table entries under a path.
+   * @param path Path to search from.
+   * @return Mount Table entries.
+   */
+  private List<MountTable> getTreeValues(final String path) {
+    return getTreeValues(path, false);
+  }
+
+  /**
+   * Get the mount table entries under a path.
+   * @param path Path to search from.
+   * @param reverse If the order should be reversed.
+   * @return Mount Table entries.
+   */
+  private List<MountTable> getTreeValues(final String path, boolean reverse) {
+    LinkedList<MountTable> ret = new LinkedList<>();
+    readLock.lock();
+    try {
+      String from = path;
+      String to = path + Character.MAX_VALUE;
+      SortedMap<String, MountTable> subMap = this.tree.subMap(from, to);
+      for (MountTable entry : subMap.values()) {
+        if (!reverse) {
+          ret.add(entry);
+        } else {
+          ret.addFirst(entry);
+        }
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return ret;
+  }
+
+  /**
+   * Get the size of the cache.
+   * @return Size of the cache.
+   */
+  protected long getCacheSize() {
+    return this.locationCache.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java
new file mode 100644
index 0000000..8dd73ec
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import java.io.IOException;
+import java.util.EnumMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.HashFirstResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.HashResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.LocalResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.OrderedResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.RandomResolver;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Mount table resolver that supports multiple locations for each mount entry.
+ * The returned location contains prioritized remote paths from highest priority
+ * to the lowest priority. Multiple locations for a mount point are optional.
+ * When multiple locations are specified, both will be checked for the presence
+ * of a file and the nameservice for a new file/dir is chosen based on the
+ * results of a consistent hashing algorithm.
+ * <p>
+ * Does the Mount table entry for this path have multiple destinations?
+ * <ul>
+ * <li>No -> Return the location
+ * <li>Yes -> Return all locations, prioritizing the best guess from the
+ * consistent hashing algorithm.
+ * </ul>
+ * <p>
+ * It has multiple options to order the locations: HASH (default), LOCAL,
+ * RANDOM, and HASH_ALL.
+ * <p>
+ * The consistent hashing result is dependent on the number and combination of
+ * nameservices that are registered for particular mount point. The order of
+ * nameservices/locations in the mount table is not prioritized. Each consistent
+ * hash calculation considers only the set of unique nameservices present for
+ * the mount table location.
+ */
+public class MultipleDestinationMountTableResolver extends MountTableResolver {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MultipleDestinationMountTableResolver.class);
+
+
+  /** Resolvers that use a particular order for multiple destinations. */
+  private EnumMap<DestinationOrder, OrderedResolver> orderedResolvers =
+      new EnumMap<>(DestinationOrder.class);
+
+
+  public MultipleDestinationMountTableResolver(
+      Configuration conf, Router router) {
+    super(conf, router);
+
+    // Initialize the ordered resolvers
+    addResolver(DestinationOrder.HASH, new HashFirstResolver());
+    addResolver(DestinationOrder.LOCAL, new LocalResolver(conf, router));
+    addResolver(DestinationOrder.RANDOM, new RandomResolver());
+    addResolver(DestinationOrder.HASH_ALL, new HashResolver());
+  }
+
+  @Override
+  public PathLocation getDestinationForPath(String path) throws IOException {
+    PathLocation mountTableResult = super.getDestinationForPath(path);
+    if (mountTableResult == null) {
+      LOG.error("The {} cannot find a location for {}",
+          super.getClass().getSimpleName(), path);
+    } else if (mountTableResult.hasMultipleDestinations()) {
+      DestinationOrder order = mountTableResult.getDestinationOrder();
+      OrderedResolver orderedResolver = orderedResolvers.get(order);
+      if (orderedResolver == null) {
+        LOG.error("Cannot find resolver for order {}", order);
+      } else {
+        String firstNamespace =
+            orderedResolver.getFirstNamespace(path, mountTableResult);
+
+        // Change the order of the name spaces according to the policy
+        if (firstNamespace != null) {
+          // This is the entity in the tree, we need to create our own copy
+          mountTableResult = new PathLocation(mountTableResult, firstNamespace);
+          LOG.debug("Ordered locations following {} are {}",
+              order, mountTableResult);
+        } else {
+          LOG.error("Cannot get main namespace for path {} with order {}",
+              path, order);
+        }
+      }
+    }
+    return mountTableResult;
+  }
+
+  @VisibleForTesting
+  public void addResolver(DestinationOrder order, OrderedResolver resolver) {
+    orderedResolvers.put(order, resolver);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodePriorityComparator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodePriorityComparator.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodePriorityComparator.java
new file mode 100644
index 0000000..fe82f29
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodePriorityComparator.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import java.util.Comparator;
+
+/**
+ * Compares NNs in the same namespace and prioritizes by their status. The
+ * priorities are:
+ * <ul>
+ * <li>ACTIVE
+ * <li>STANDBY
+ * <li>UNAVAILABLE
+ * </ul>
+ * When two NNs have the same state, the last modification date is the tie
+ * breaker, newest has priority. Expired NNs are excluded.
+ */
+public class NamenodePriorityComparator
+    implements Comparator<FederationNamenodeContext> {
+
+  @Override
+  public int compare(FederationNamenodeContext o1,
+      FederationNamenodeContext o2) {
+    FederationNamenodeServiceState state1 = o1.getState();
+    FederationNamenodeServiceState state2 = o2.getState();
+
+    if (state1 == state2) {
+      // Both have the same state, use mode dates
+      return compareModDates(o1, o2);
+    } else {
+      // Enum is ordered by priority
+      return state1.compareTo(state2);
+    }
+  }
+
+  /**
+   * Compare the modification dates.
+   *
+   * @param o1 Context 1.
+   * @param o2 Context 2.
+   * @return Comparison between dates.
+   */
+  private int compareModDates(FederationNamenodeContext o1,
+      FederationNamenodeContext o2) {
+    // Reverse sort, lowest position is highest priority.
+    return (int) (o2.getDateModified() - o1.getDateModified());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
new file mode 100644
index 0000000..c3c6fa8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+
+/**
+ * Status of the namenode.
+ */
+public class NamenodeStatusReport {
+
+  /** Namenode information. */
+  private String nameserviceId = "";
+  private String namenodeId = "";
+  private String clusterId = "";
+  private String blockPoolId = "";
+  private String rpcAddress = "";
+  private String serviceAddress = "";
+  private String lifelineAddress = "";
+  private String webAddress = "";
+
+  /** Namenode state. */
+  private HAServiceState status = HAServiceState.STANDBY;
+  private boolean safeMode = false;
+
+  /** Datanodes stats. */
+  private int liveDatanodes = -1;
+  private int deadDatanodes = -1;
+  /** Decommissioning datanodes. */
+  private int decomDatanodes = -1;
+  /** Live decommissioned datanodes. */
+  private int liveDecomDatanodes = -1;
+  /** Dead decommissioned datanodes. */
+  private int deadDecomDatanodes = -1;
+
+  /** Space stats. */
+  private long availableSpace = -1;
+  private long numOfFiles = -1;
+  private long numOfBlocks = -1;
+  private long numOfBlocksMissing = -1;
+  private long numOfBlocksPendingReplication = -1;
+  private long numOfBlocksUnderReplicated = -1;
+  private long numOfBlocksPendingDeletion = -1;
+  private long totalSpace = -1;
+  private long providedSpace = -1;
+
+  /** If the fields are valid. */
+  private boolean registrationValid = false;
+  private boolean statsValid = false;
+  private boolean haStateValid = false;
+
+  public NamenodeStatusReport(String ns, String nn, String rpc, String service,
+      String lifeline, String web) {
+    this.nameserviceId = ns;
+    this.namenodeId = nn;
+    this.rpcAddress = rpc;
+    this.serviceAddress = service;
+    this.lifelineAddress = lifeline;
+    this.webAddress = web;
+  }
+
+  /**
+   * If the statistics are valid.
+   *
+   * @return If the statistics are valid.
+   */
+  public boolean statsValid() {
+    return this.statsValid;
+  }
+
+  /**
+   * If the registration is valid.
+   *
+   * @return If the registration is valid.
+   */
+  public boolean registrationValid() {
+    return this.registrationValid;
+  }
+
+  /**
+   * If the HA state is valid.
+   *
+   * @return If the HA state is valid.
+   */
+  public boolean haStateValid() {
+    return this.haStateValid;
+  }
+
+  /**
+   * Get the state of the Namenode being monitored.
+   *
+   * @return State of the Namenode.
+   */
+  public FederationNamenodeServiceState getState() {
+    if (!registrationValid) {
+      return FederationNamenodeServiceState.UNAVAILABLE;
+    } else if (haStateValid) {
+      return FederationNamenodeServiceState.getState(status);
+    } else {
+      return FederationNamenodeServiceState.ACTIVE;
+    }
+  }
+
+  /**
+   * Get the name service identifier.
+   *
+   * @return The name service identifier.
+   */
+  public String getNameserviceId() {
+    return this.nameserviceId;
+  }
+
+  /**
+   * Get the namenode identifier.
+   *
+   * @return The namenode identifier.
+   */
+  public String getNamenodeId() {
+    return this.namenodeId;
+  }
+
+  /**
+   * Get the cluster identifier.
+   *
+   * @return The cluster identifier.
+   */
+  public String getClusterId() {
+    return this.clusterId;
+  }
+
+  /**
+   * Get the block pool identifier.
+   *
+   * @return The block pool identifier.
+   */
+  public String getBlockPoolId() {
+    return this.blockPoolId;
+  }
+
+  /**
+   * Get the RPC address.
+   *
+   * @return The RPC address.
+   */
+  public String getRpcAddress() {
+    return this.rpcAddress;
+  }
+
+  /**
+   * Get the Service RPC address.
+   *
+   * @return The Service RPC address.
+   */
+  public String getServiceAddress() {
+    return this.serviceAddress;
+  }
+
+  /**
+   * Get the Lifeline RPC address.
+   *
+   * @return The Lifeline RPC address.
+   */
+  public String getLifelineAddress() {
+    return this.lifelineAddress;
+  }
+
+  /**
+   * Get the web address.
+   *
+   * @return The web address.
+   */
+  public String getWebAddress() {
+    return this.webAddress;
+  }
+
+  /**
+   * Get the HA service state.
+   *
+   * @return The HA service state.
+   */
+  public void setHAServiceState(HAServiceState state) {
+    this.status = state;
+    this.haStateValid = true;
+  }
+
+  /**
+   * Set the namespace information.
+   *
+   * @param info Namespace information.
+   */
+  public void setNamespaceInfo(NamespaceInfo info) {
+    this.clusterId = info.getClusterID();
+    this.blockPoolId = info.getBlockPoolID();
+    this.registrationValid = true;
+  }
+
+  public void setSafeMode(boolean safemode) {
+    this.safeMode = safemode;
+  }
+
+  public boolean getSafemode() {
+    return this.safeMode;
+  }
+
+  /**
+   * Set the datanode information.
+   *
+   * @param numLive Number of live nodes.
+   * @param numDead Number of dead nodes.
+   * @param numDecom Number of decommissioning nodes.
+   * @param numLiveDecom Number of decommissioned live nodes.
+   * @param numDeadDecom Number of decommissioned dead nodes.
+   */
+  public void setDatanodeInfo(int numLive, int numDead, int numDecom,
+      int numLiveDecom, int numDeadDecom) {
+    this.liveDatanodes = numLive;
+    this.deadDatanodes = numDead;
+    this.decomDatanodes = numDecom;
+    this.liveDecomDatanodes = numLiveDecom;
+    this.deadDecomDatanodes = numDeadDecom;
+    this.statsValid = true;
+  }
+
+  /**
+   * Get the number of live blocks.
+   *
+   * @return The number of dead nodes.
+   */
+  public int getNumLiveDatanodes() {
+    return this.liveDatanodes;
+  }
+
+  /**
+   * Get the number of dead blocks.
+   *
+   * @return The number of dead nodes.
+   */
+  public int getNumDeadDatanodes() {
+    return this.deadDatanodes;
+  }
+
+  /**
+   * Get the number of decommissionining nodes.
+   *
+   * @return The number of decommissionining nodes.
+   */
+  public int getNumDecommissioningDatanodes() {
+    return this.decomDatanodes;
+  }
+
+  /**
+   * Get the number of live decommissioned nodes.
+   *
+   * @return The number of live decommissioned nodes.
+   */
+  public int getNumDecomLiveDatanodes() {
+    return this.liveDecomDatanodes;
+  }
+
+  /**
+   * Get the number of dead decommissioned nodes.
+   *
+   * @return The number of dead decommissioned nodes.
+   */
+  public int getNumDecomDeadDatanodes() {
+    return this.deadDecomDatanodes;
+  }
+
+  /**
+   * Set the filesystem information.
+   *
+   * @param available Available capacity.
+   * @param total Total capacity.
+   * @param numFiles Number of files.
+   * @param numBlocks Total number of blocks.
+   * @param numBlocksMissing Number of missing blocks.
+   * @param numBlocksPendingReplication Number of blocks pending replication.
+   * @param numBlocksUnderReplicated Number of blocks under replication.
+   * @param numBlocksPendingDeletion Number of blocks pending deletion.
+   */
+  public void setNamesystemInfo(long available, long total,
+      long numFiles, long numBlocks, long numBlocksMissing,
+      long numBlocksPendingReplication, long numBlocksUnderReplicated,
+      long numBlocksPendingDeletion, long providedSpace) {
+    this.totalSpace = total;
+    this.availableSpace = available;
+    this.numOfBlocks = numBlocks;
+    this.numOfBlocksMissing = numBlocksMissing;
+    this.numOfBlocksPendingReplication = numBlocksPendingReplication;
+    this.numOfBlocksUnderReplicated = numBlocksUnderReplicated;
+    this.numOfBlocksPendingDeletion = numBlocksPendingDeletion;
+    this.numOfFiles = numFiles;
+    this.statsValid = true;
+    this.providedSpace = providedSpace;
+  }
+
+  /**
+   * Get the number of blocks.
+   *
+   * @return The number of blocks.
+   */
+  public long getNumBlocks() {
+    return this.numOfBlocks;
+  }
+
+  /**
+   * Get the number of files.
+   *
+   * @return The number of files.
+   */
+  public long getNumFiles() {
+    return this.numOfFiles;
+  }
+
+  /**
+   * Get the total space.
+   *
+   * @return The total space.
+   */
+  public long getTotalSpace() {
+    return this.totalSpace;
+  }
+
+  /**
+   * Get the available space.
+   *
+   * @return The available space.
+   */
+  public long getAvailableSpace() {
+    return this.availableSpace;
+  }
+
+  /**
+   * Get the space occupied by provided storage.
+   *
+   * @return the provided capacity.
+   */
+  public long getProvidedSpace() {
+    return this.providedSpace;
+  }
+  /**
+   * Get the number of missing blocks.
+   *
+   * @return Number of missing blocks.
+   */
+  public long getNumBlocksMissing() {
+    return this.numOfBlocksMissing;
+  }
+
+  /**
+   * Get the number of pending replication blocks.
+   *
+   * @return Number of pending replication blocks.
+   */
+  public long getNumOfBlocksPendingReplication() {
+    return this.numOfBlocksPendingReplication;
+  }
+
+  /**
+   * Get the number of under replicated blocks.
+   *
+   * @return Number of under replicated blocks.
+   */
+  public long getNumOfBlocksUnderReplicated() {
+    return this.numOfBlocksUnderReplicated;
+  }
+
+  /**
+   * Get the number of pending deletion blocks.
+   *
+   * @return Number of pending deletion blocks.
+   */
+  public long getNumOfBlocksPendingDeletion() {
+    return this.numOfBlocksPendingDeletion;
+  }
+
+  /**
+   * Set the validity of registration.
+   * @param isValid The desired value to be set.
+   */
+  public void setRegistrationValid(boolean isValid) {
+    this.registrationValid = isValid;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s-%s:%s",
+        nameserviceId, namenodeId, serviceAddress);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/PathLocation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/PathLocation.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/PathLocation.java
new file mode 100644
index 0000000..945d81d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/PathLocation.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A map of the properties and target destinations (name space + path) for
+ * a path in the global/federated name space.
+ * This data is generated from the @see MountTable records.
+ */
+public class PathLocation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PathLocation.class);
+
+
+  /** Source path in global namespace. */
+  private final String sourcePath;
+
+  /** Remote paths in the target name spaces. */
+  private final List<RemoteLocation> destinations;
+  /** Order for the destinations. */
+  private final DestinationOrder destOrder;
+
+
+  /**
+   * Create a new PathLocation.
+   *
+   * @param source Source path in the global name space.
+   * @param dest Destinations of the mount table entry.
+   * @param order Order of the locations.
+   */
+  public PathLocation(
+      String source, List<RemoteLocation> dest, DestinationOrder order) {
+    this.sourcePath = source;
+    this.destinations = Collections.unmodifiableList(dest);
+    this.destOrder = order;
+  }
+
+  /**
+   * Create a new PathLocation with default HASH order.
+   *
+   * @param source Source path in the global name space.
+   * @param dest Destinations of the mount table entry.
+   */
+  public PathLocation(String source, List<RemoteLocation> dest) {
+    this(source, dest, DestinationOrder.HASH);
+  }
+
+  /**
+   * Create a path location from another path.
+   *
+   * @param other Other path location to copy from.
+   */
+  public PathLocation(final PathLocation other) {
+    this.sourcePath = other.sourcePath;
+    this.destinations = Collections.unmodifiableList(other.destinations);
+    this.destOrder = other.destOrder;
+  }
+
+  /**
+   * Create a path location from another path with the destinations sorted.
+   *
+   * @param other Other path location to copy from.
+   * @param firstNsId Identifier of the namespace to place first.
+   */
+  public PathLocation(PathLocation other, String firstNsId) {
+    this.sourcePath = other.sourcePath;
+    this.destOrder = other.destOrder;
+    this.destinations = orderedNamespaces(other.destinations, firstNsId);
+  }
+
+  /**
+   * Prioritize a location/destination by its name space/nameserviceId.
+   * This destination might be used by other threads, so the source is not
+   * modifiable.
+   *
+   * @param original List of destinations to order.
+   * @param nsId The name space/nameserviceID to prioritize.
+   * @return Prioritized list of detinations that cannot be modified.
+   */
+  private static List<RemoteLocation> orderedNamespaces(
+      final List<RemoteLocation> original, final String nsId) {
+    if (original.size() <= 1) {
+      return original;
+    }
+
+    LinkedList<RemoteLocation> newDestinations = new LinkedList<>();
+    boolean found = false;
+    for (RemoteLocation dest : original) {
+      if (dest.getNameserviceId().equals(nsId)) {
+        found = true;
+        newDestinations.addFirst(dest);
+      } else {
+        newDestinations.add(dest);
+      }
+    }
+
+    if (!found) {
+      LOG.debug("Cannot find location with namespace {} in {}",
+          nsId, original);
+    }
+    return Collections.unmodifiableList(newDestinations);
+  }
+
+  /**
+   * Get the source path in the global namespace for this path location.
+   *
+   * @return The path in the global namespace.
+   */
+  public String getSourcePath() {
+    return this.sourcePath;
+  }
+
+  /**
+   * Get the subclusters defined for the destinations.
+   *
+   * @return Set containing the subclusters.
+   */
+  public Set<String> getNamespaces() {
+    Set<String> namespaces = new HashSet<>();
+    List<RemoteLocation> locations = this.getDestinations();
+    for (RemoteLocation location : locations) {
+      String nsId = location.getNameserviceId();
+      namespaces.add(nsId);
+    }
+    return namespaces;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    for (RemoteLocation destination : this.destinations) {
+      String nsId = destination.getNameserviceId();
+      String path = destination.getDest();
+      if (sb.length() > 0) {
+        sb.append(",");
+      }
+      sb.append(nsId + "->" + path);
+    }
+    if (this.destinations.size() > 1) {
+      sb.append(" [");
+      sb.append(this.destOrder.toString());
+      sb.append("]");
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Check if this location supports multiple clusters/paths.
+   *
+   * @return If it has multiple destinations.
+   */
+  public boolean hasMultipleDestinations() {
+    return this.destinations.size() > 1;
+  }
+
+  /**
+   * Get the list of locations found in the mount table.
+   * The first result is the highest priority path.
+   *
+   * @return List of remote locations.
+   */
+  public List<RemoteLocation> getDestinations() {
+    return Collections.unmodifiableList(this.destinations);
+  }
+
+  /**
+   * Get the order for the destinations.
+   *
+   * @return Order for the destinations.
+   */
+  public DestinationOrder getDestinationOrder() {
+    return this.destOrder;
+  }
+
+  /**
+   * Get the default or highest priority location.
+   *
+   * @return The default location.
+   */
+  public RemoteLocation getDefaultLocation() {
+    if (destinations.isEmpty() || destinations.get(0).getDest() == null) {
+      throw new UnsupportedOperationException(
+          "Unsupported path " + sourcePath + " please check mount table");
+    }
+    return destinations.get(0);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java
new file mode 100644
index 0000000..6aa12ce
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext;
+
+/**
+ * A single in a remote namespace consisting of a nameservice ID
+ * and a HDFS path.
+ */
+public class RemoteLocation extends RemoteLocationContext {
+
+  /** Identifier of the remote namespace for this location. */
+  private final String nameserviceId;
+  /** Identifier of the namenode in the namespace for this location. */
+  private final String namenodeId;
+  /** Path in the remote location. */
+  private final String path;
+
+  /**
+   * Create a new remote location.
+   *
+   * @param nsId
+   * @param pPath
+   */
+  public RemoteLocation(String nsId, String pPath) {
+    this(nsId, null, pPath);
+  }
+
+  /**
+   * Create a new remote location pointing to a particular namenode in the
+   * namespace.
+   *
+   * @param nsId Destination namespace.
+   * @param pPath Path in the destination namespace.
+   */
+  public RemoteLocation(String nsId, String nnId, String pPath) {
+    this.nameserviceId = nsId;
+    this.namenodeId = nnId;
+    this.path = pPath;
+  }
+
+  @Override
+  public String getNameserviceId() {
+    String ret = this.nameserviceId;
+    if (this.namenodeId != null) {
+      ret += "-" + this.namenodeId;
+    }
+    return ret;
+  }
+
+  @Override
+  public String getDest() {
+    return this.path;
+  }
+
+  @Override
+  public String toString() {
+    return getNameserviceId() + "->" + this.path;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
new file mode 100644
index 0000000..03e68e5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver.order;
+
+/**
+ * Order of the destinations when we have multiple of them. When the resolver
+ * of files to subclusters (FileSubclusterResolver) has multiple destinations,
+ * this determines which location should be checked first.
+ */
+public enum DestinationOrder {
+  HASH, // Follow consistent hashing in the first folder level
+  LOCAL, // Local first
+  RANDOM, // Random order
+  HASH_ALL // Follow consistent hashing
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashFirstResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashFirstResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashFirstResolver.java
new file mode 100644
index 0000000..831b082
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashFirstResolver.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver.order;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
+
+/**
+ * Variation of HashResolver that only uses the first level of the path.
+ */
+public class HashFirstResolver extends HashResolver {
+
+  @Override
+  public String getFirstNamespace(final String path, final PathLocation loc) {
+    String srcPath = loc.getSourcePath();
+    String trimmedPath = trimPathToChild(path, srcPath);
+    LOG.debug("Only using the first part of the path: {} -> {}",
+        path, trimmedPath);
+    return super.getFirstNamespace(trimmedPath, loc);
+  }
+
+  /**
+   * Hash only up to the immediate child of the mount point. This prevents the
+   * need to create/maintain subtrees under each multi-destination mount point.
+   * Each child of a multi-destination mount is mapped to only one hash
+   * location.
+   * <p>
+   * Trims a path to at most the immediate child of a parent path. For example:
+   * <ul>
+   * <li>path = /a/b/c, parent = /a will be trimmed to /a/b.
+   * <li>path = /a/b, parent = /a/b will be trimmed to /a/b
+   * </ul>
+   *
+   * @param path The path to trim.
+   * @param parent The parent used to find the immediate child.
+   * @return Trimmed path.
+   */
+  private static String trimPathToChild(String path, String parent) {
+    // Path is invalid or equal to the parent
+    if (path.length() <= parent.length()) {
+      return parent;
+    }
+    String remainder = path.substring(parent.length());
+    String[] components =
+        remainder.replaceFirst("^/", "").split(Path.SEPARATOR);
+    if (components.length > 0 && components[0].length() > 0) {
+      if (parent.endsWith(Path.SEPARATOR)) {
+        return parent + components[0];
+      } else {
+        return parent + Path.SEPARATOR + components[0];
+      }
+    } else {
+      return parent;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashResolver.java
new file mode 100644
index 0000000..4034a46
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashResolver.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver.order;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
+import org.apache.hadoop.hdfs.server.federation.utils.ConsistentHashRing;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Order the destinations based on consistent hashing.
+ */
+public class HashResolver implements OrderedResolver {
+
+  protected static final Logger LOG =
+      LoggerFactory.getLogger(HashResolver.class);
+
+
+  /** Namespace set hash -> Locator. */
+  private final Map<Integer, ConsistentHashRing> hashResolverMap;
+
+  /** Patterns for temporary files. */
+  private static final String HEX_PATTERN = "\\p{XDigit}";
+  private static final String UUID_PATTERN = HEX_PATTERN + "{8}-" +
+      HEX_PATTERN + "{4}-" + HEX_PATTERN + "{4}-" + HEX_PATTERN + "{4}-" +
+      HEX_PATTERN + "{12}";
+  private static final String ATTEMPT_PATTERN =
+      "attempt_\\d+_\\d{4}_._\\d{6}_\\d{2}";
+  private static final String[] TEMP_FILE_PATTERNS = {
+      "(.+)\\.COPYING$",
+      "(.+)\\._COPYING_.*$",
+      "(.+)\\.tmp$",
+      "_temp/(.+)$",
+      "_temporary/(.+)\\." + UUID_PATTERN + "$",
+      "(.*)_temporary/\\d/_temporary/" + ATTEMPT_PATTERN + "/(.+)$" };
+  /** Pattern for temporary files (or of the individual patterns). */
+  private static final Pattern TEMP_FILE_PATTERN =
+      Pattern.compile(StringUtils.join("|", TEMP_FILE_PATTERNS));
+
+
+  public HashResolver() {
+    this.hashResolverMap = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Use the result from consistent hashing locator to prioritize the locations
+   * for a path.
+   *
+   * @param path Path to check.
+   * @param loc Federated location with multiple destinations.
+   * @return First namespace based on hash.
+   */
+  @Override
+  public String getFirstNamespace(final String path, final PathLocation loc) {
+    String finalPath = extractTempFileName(path);
+    Set<String> namespaces = loc.getNamespaces();
+    ConsistentHashRing locator = getHashResolver(namespaces);
+    String hashedSubcluster = locator.getLocation(finalPath);
+    if (hashedSubcluster == null) {
+      String srcPath = loc.getSourcePath();
+      LOG.error("Cannot find subcluster for {} ({} -> {})",
+          srcPath, path, finalPath);
+    }
+    LOG.debug("Namespace for {} ({}) is {}", path, finalPath, hashedSubcluster);
+    return hashedSubcluster;
+  }
+
+  /**
+   * Get the cached (if available) or generate a new hash resolver for this
+   * particular set of unique namespace identifiers.
+   *
+   * @param namespaces A set of unique namespace identifiers.
+   * @return A hash resolver configured to consistently resolve paths to
+   *         namespaces using the provided set of namespace identifiers.
+   */
+  private ConsistentHashRing getHashResolver(final Set<String> namespaces) {
+    int hash = namespaces.hashCode();
+    ConsistentHashRing resolver = this.hashResolverMap.get(hash);
+    if (resolver == null) {
+      resolver = new ConsistentHashRing(namespaces);
+      this.hashResolverMap.put(hash, resolver);
+    }
+    return resolver;
+  }
+
+  /**
+   * Some files use a temporary naming pattern. Extract the final name from the
+   * temporary name. For example, files *._COPYING_ will be renamed, so we
+   * remove that chunk.
+   *
+   * @param input Input string.
+   * @return Final file name.
+   */
+  @VisibleForTesting
+  public static String extractTempFileName(final String input) {
+    StringBuilder sb = new StringBuilder();
+    Matcher matcher = TEMP_FILE_PATTERN.matcher(input);
+    if (matcher.find()) {
+      for (int i=1; i <= matcher.groupCount(); i++) {
+        String match = matcher.group(i);
+        if (match != null) {
+          sb.append(match);
+        }
+      }
+    }
+    if (sb.length() > 0) {
+      String ret = sb.toString();
+      LOG.debug("Extracted {} from {}", ret, input);
+      return ret;
+    }
+    return input;
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message