From common-commits-return-79985-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Tue Mar 20 06:18:47 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9C77418077A for ; Tue, 20 Mar 2018 06:18:45 +0100 (CET) Received: (qmail 17986 invoked by uid 500); 20 Mar 2018 05:18:32 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 12954 invoked by uid 99); 20 Mar 2018 05:18:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Mar 2018 05:18:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C5361F6711; Tue, 20 Mar 2018 05:18:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: weiy@apache.org To: common-commits@hadoop.apache.org Date: Tue, 20 Mar 2018 05:19:03 -0000 Message-Id: <2e94135dd05d4b25aa173e94e590a0a0@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [37/39] hadoop git commit: HDFS-13215. RBF: Move Router to its own module. Contributed by Wei Yan http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FileSubclusterResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FileSubclusterResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FileSubclusterResolver.java new file mode 100644 index 0000000..af9f493 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FileSubclusterResolver.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.resolver; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Interface to map a file path in the global name space to a specific + * subcluster and path in an HDFS name space. + *

+ * Each path in the global/federated namespace may map to 1-N different HDFS + * locations. Each location specifies a single nameservice and a single HDFS + * path. The behavior is similar to MergeFS and Nfly and allows the merger + * of multiple HDFS locations into a single path. See HADOOP-8298 and + * HADOOP-12077 + *

+ * For example, a directory listing will fetch listings for each destination + * path and combine them into a single set of results. + *

+ * When multiple destinations are available for a path, the destinations are + * prioritized in a consistent manner. This allows the proxy server to + * guess the best/most likely destination and attempt it first. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface FileSubclusterResolver { + + /** + * Get the destinations for a global path. Results are from the mount table + * cache. If multiple destinations are available, the first result is the + * highest priority destination. + * + * @param path Global path. + * @return Location in a destination namespace or null if it does not exist. + * @throws IOException Throws exception if the data is not available. + */ + PathLocation getDestinationForPath(String path) throws IOException; + + /** + * Get a list of mount points for a path. Results are from the mount table + * cache. + * + * @return List of mount points present at this path or zero-length list if + * none are found. + * @throws IOException Throws exception if the data is not available. + */ + List getMountPoints(String path) throws IOException; + + /** + * Get the default namespace for the cluster. + * + * @return Default namespace identifier. + */ + String getDefaultNamespace(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java new file mode 100644 index 0000000..b87eeec --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.ACTIVE; +import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.EXPIRED; +import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.UNAVAILABLE; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements a cached lookup of the most recently active namenode for a + * particular nameservice. Relies on the {@link StateStoreService} to + * discover available nameservices and namenodes. + */ +public class MembershipNamenodeResolver + implements ActiveNamenodeResolver, StateStoreCache { + + private static final Logger LOG = + LoggerFactory.getLogger(MembershipNamenodeResolver.class); + + /** Reference to the State Store. */ + private final StateStoreService stateStore; + /** Membership State Store interface. */ + private MembershipStore membershipInterface; + + /** Parent router ID. */ + private String routerId; + + /** Cached lookup of NN for nameservice. Invalidated on cache refresh. */ + private Map> cacheNS; + /** Cached lookup of NN for block pool. Invalidated on cache refresh. */ + private Map> cacheBP; + + + public MembershipNamenodeResolver( + Configuration conf, StateStoreService store) throws IOException { + this.stateStore = store; + + this.cacheNS = new ConcurrentHashMap<>(); + this.cacheBP = new ConcurrentHashMap<>(); + + if (this.stateStore != null) { + // Request cache updates from the state store + this.stateStore.registerCacheExternal(this); + } + } + + private synchronized MembershipStore getMembershipStore() throws IOException { + if (this.membershipInterface == null) { + this.membershipInterface = this.stateStore.getRegisteredRecordStore( + MembershipStore.class); + if (this.membershipInterface == null) { + throw new IOException("State Store does not have an interface for " + + MembershipStore.class.getSimpleName()); + } + } + return this.membershipInterface; + } + + @Override + public boolean loadCache(boolean force) { + // Our cache depends on the store, update it first + try { + MembershipStore membership = getMembershipStore(); + membership.loadCache(force); + } catch (IOException e) { + LOG.error("Cannot update membership from the State Store", e); + } + + // Force refresh of active NN cache + cacheBP.clear(); + cacheNS.clear(); + return true; + } + + @Override + public void updateActiveNamenode( + final String nsId, final InetSocketAddress address) throws IOException { + + // Called when we have an RPC miss and successful hit on an alternate NN. + // Temporarily update our cache, it will be overwritten on the next update. + try { + MembershipState partial = MembershipState.newInstance(); + String rpcAddress = address.getHostName() + ":" + address.getPort(); + partial.setRpcAddress(rpcAddress); + partial.setNameserviceId(nsId); + + GetNamenodeRegistrationsRequest request = + GetNamenodeRegistrationsRequest.newInstance(partial); + + MembershipStore membership = getMembershipStore(); + GetNamenodeRegistrationsResponse response = + membership.getNamenodeRegistrations(request); + List records = response.getNamenodeMemberships(); + + if (records != null && records.size() == 1) { + MembershipState record = records.get(0); + UpdateNamenodeRegistrationRequest updateRequest = + UpdateNamenodeRegistrationRequest.newInstance( + record.getNameserviceId(), record.getNamenodeId(), ACTIVE); + membership.updateNamenodeRegistration(updateRequest); + } + } catch (StateStoreUnavailableException e) { + LOG.error("Cannot update {} as active, State Store unavailable", address); + } + } + + @Override + public List getNamenodesForNameserviceId( + final String nsId) throws IOException { + + List ret = cacheNS.get(nsId); + if (ret == null) { + try { + MembershipState partial = MembershipState.newInstance(); + partial.setNameserviceId(nsId); + GetNamenodeRegistrationsRequest request = + GetNamenodeRegistrationsRequest.newInstance(partial); + + final List result = + getRecentRegistrationForQuery(request, true, false); + if (result == null || result.isEmpty()) { + LOG.error("Cannot locate eligible NNs for {}", nsId); + return null; + } else { + cacheNS.put(nsId, result); + ret = result; + } + } catch (StateStoreUnavailableException e) { + LOG.error("Cannot get active NN for {}, State Store unavailable", nsId); + } + } + if (ret == null) { + return null; + } + return Collections.unmodifiableList(ret); + } + + @Override + public List getNamenodesForBlockPoolId( + final String bpId) throws IOException { + + List ret = cacheBP.get(bpId); + if (ret == null) { + try { + MembershipState partial = MembershipState.newInstance(); + partial.setBlockPoolId(bpId); + GetNamenodeRegistrationsRequest request = + GetNamenodeRegistrationsRequest.newInstance(partial); + + final List result = + getRecentRegistrationForQuery(request, true, false); + if (result == null || result.isEmpty()) { + LOG.error("Cannot locate eligible NNs for {}", bpId); + } else { + cacheBP.put(bpId, result); + ret = result; + } + } catch (StateStoreUnavailableException e) { + LOG.error("Cannot get active NN for {}, State Store unavailable", bpId); + return null; + } + } + if (ret == null) { + return null; + } + return Collections.unmodifiableList(ret); + } + + @Override + public boolean registerNamenode(NamenodeStatusReport report) + throws IOException { + + if (this.routerId == null) { + LOG.warn("Cannot register namenode, router ID is not known {}", report); + return false; + } + + MembershipState record = MembershipState.newInstance( + routerId, report.getNameserviceId(), report.getNamenodeId(), + report.getClusterId(), report.getBlockPoolId(), report.getRpcAddress(), + report.getServiceAddress(), report.getLifelineAddress(), + report.getWebAddress(), report.getState(), report.getSafemode()); + + if (report.statsValid()) { + MembershipStats stats = MembershipStats.newInstance(); + stats.setNumOfFiles(report.getNumFiles()); + stats.setNumOfBlocks(report.getNumBlocks()); + stats.setNumOfBlocksMissing(report.getNumBlocksMissing()); + stats.setNumOfBlocksPendingReplication( + report.getNumOfBlocksPendingReplication()); + stats.setNumOfBlocksUnderReplicated( + report.getNumOfBlocksUnderReplicated()); + stats.setNumOfBlocksPendingDeletion( + report.getNumOfBlocksPendingDeletion()); + stats.setAvailableSpace(report.getAvailableSpace()); + stats.setTotalSpace(report.getTotalSpace()); + stats.setProvidedSpace(report.getProvidedSpace()); + stats.setNumOfDecommissioningDatanodes( + report.getNumDecommissioningDatanodes()); + stats.setNumOfActiveDatanodes(report.getNumLiveDatanodes()); + stats.setNumOfDeadDatanodes(report.getNumDeadDatanodes()); + stats.setNumOfDecomActiveDatanodes(report.getNumDecomLiveDatanodes()); + stats.setNumOfDecomDeadDatanodes(report.getNumDecomDeadDatanodes()); + record.setStats(stats); + } + + if (report.getState() != UNAVAILABLE) { + // Set/update our last contact time + record.setLastContact(Time.now()); + } + + NamenodeHeartbeatRequest request = NamenodeHeartbeatRequest.newInstance(); + request.setNamenodeMembership(record); + return getMembershipStore().namenodeHeartbeat(request).getResult(); + } + + @Override + public Set getNamespaces() throws IOException { + GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance(); + GetNamespaceInfoResponse response = + getMembershipStore().getNamespaceInfo(request); + return response.getNamespaceInfo(); + } + + /** + * Picks the most relevant record registration that matches the query. Return + * registrations matching the query in this preference: 1) Most recently + * updated ACTIVE registration 2) Most recently updated STANDBY registration + * (if showStandby) 3) Most recently updated UNAVAILABLE registration (if + * showUnavailable). EXPIRED registrations are ignored. + * + * @param request The select query for NN registrations. + * @param addUnavailable include UNAVAILABLE registrations. + * @param addExpired include EXPIRED registrations. + * @return List of memberships or null if no registrations that + * both match the query AND the selected states. + * @throws IOException + */ + private List getRecentRegistrationForQuery( + GetNamenodeRegistrationsRequest request, boolean addUnavailable, + boolean addExpired) throws IOException { + + // Retrieve a list of all registrations that match this query. + // This may include all NN records for a namespace/blockpool, including + // duplicate records for the same NN from different routers. + MembershipStore membershipStore = getMembershipStore(); + GetNamenodeRegistrationsResponse response = + membershipStore.getNamenodeRegistrations(request); + + List memberships = response.getNamenodeMemberships(); + if (!addExpired || !addUnavailable) { + Iterator iterator = memberships.iterator(); + while (iterator.hasNext()) { + MembershipState membership = iterator.next(); + if (membership.getState() == EXPIRED && !addExpired) { + iterator.remove(); + } else if (membership.getState() == UNAVAILABLE && !addUnavailable) { + iterator.remove(); + } + } + } + + List priorityList = new ArrayList<>(); + priorityList.addAll(memberships); + Collections.sort(priorityList, new NamenodePriorityComparator()); + + LOG.debug("Selected most recent NN {} for query", priorityList); + return priorityList; + } + + @Override + public void setRouterId(String router) { + this.routerId = router; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java new file mode 100644 index 0000000..c2e4a5b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; + +/** + * Manage a mount table. + */ +public interface MountTableManager { + + /** + * Add an entry to the mount table. + * + * @param request Fully populated request object. + * @return True if the mount table entry was successfully committed to the + * data store. + * @throws IOException Throws exception if the data store is not initialized. + */ + AddMountTableEntryResponse addMountTableEntry( + AddMountTableEntryRequest request) throws IOException; + + /** + * Updates an existing entry in the mount table. + * + * @param request Fully populated request object. + * @return True if the mount table entry was successfully committed to the + * data store. + * @throws IOException Throws exception if the data store is not initialized. + */ + UpdateMountTableEntryResponse updateMountTableEntry( + UpdateMountTableEntryRequest request) throws IOException; + + /** + * Remove an entry from the mount table. + * + * @param request Fully populated request object. + * @return True the mount table entry was removed from the data store. + * @throws IOException Throws exception if the data store is not initialized. + */ + RemoveMountTableEntryResponse removeMountTableEntry( + RemoveMountTableEntryRequest request) throws IOException; + + /** + * List all mount table entries present at or below the path. Fetches from the + * state store. + * + * @param request Fully populated request object. + * + * @return List of all mount table entries under the path. Zero-length list if + * none are found. + * @throws IOException Throws exception if the data store cannot be queried. + */ + GetMountTableEntriesResponse getMountTableEntries( + GetMountTableEntriesRequest request) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java new file mode 100644 index 0000000..9713138 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java @@ -0,0 +1,597 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.isParentEntry; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +/** + * Mount table to map between global paths and remote locations. This allows the + * {@link org.apache.hadoop.hdfs.server.federation.router.Router Router} to map + * the global HDFS view to the remote namespaces. This is similar to + * {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}. + * This is implemented as a tree. + */ +public class MountTableResolver + implements FileSubclusterResolver, StateStoreCache { + + private static final Logger LOG = + LoggerFactory.getLogger(MountTableResolver.class); + + /** Reference to Router. */ + private final Router router; + /** Reference to the State Store. */ + private final StateStoreService stateStore; + /** Interface to the mount table store. */ + private MountTableStore mountTableStore; + + /** If the tree has been initialized. */ + private boolean init = false; + /** Path -> Remote HDFS location. */ + private final TreeMap tree = new TreeMap<>(); + /** Path -> Remote location. */ + private final Cache locationCache; + + /** Default nameservice when no mount matches the math. */ + private String defaultNameService = ""; + + /** Synchronization for both the tree and the cache. */ + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); + + + @VisibleForTesting + public MountTableResolver(Configuration conf) { + this(conf, (StateStoreService)null); + } + + public MountTableResolver(Configuration conf, Router routerService) { + this(conf, routerService, null); + } + + public MountTableResolver(Configuration conf, StateStoreService store) { + this(conf, null, store); + } + + public MountTableResolver(Configuration conf, Router routerService, + StateStoreService store) { + this.router = routerService; + if (store != null) { + this.stateStore = store; + } else if (this.router != null) { + this.stateStore = this.router.getStateStore(); + } else { + this.stateStore = null; + } + + int maxCacheSize = conf.getInt( + FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE, + FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE_DEFAULT); + this.locationCache = CacheBuilder.newBuilder() + .maximumSize(maxCacheSize) + .build(); + + registerCacheExternal(); + initDefaultNameService(conf); + } + + /** + * Request cache updates from the State Store for this resolver. + */ + private void registerCacheExternal() { + if (this.stateStore != null) { + this.stateStore.registerCacheExternal(this); + } + } + + /** + * Nameservice for APIs that cannot be resolved to a specific one. + * + * @param conf Configuration for this resolver. + */ + private void initDefaultNameService(Configuration conf) { + try { + this.defaultNameService = conf.get( + DFS_ROUTER_DEFAULT_NAMESERVICE, + DFSUtil.getNamenodeNameServiceId(conf)); + } catch (HadoopIllegalArgumentException e) { + LOG.error("Cannot find default name service, setting it to the first"); + Collection nsIds = DFSUtilClient.getNameServiceIds(conf); + this.defaultNameService = nsIds.iterator().next(); + LOG.info("Default name service: {}", this.defaultNameService); + } + } + + /** + * Get a reference for the Router for this resolver. + * + * @return Router for this resolver. + */ + protected Router getRouter() { + return this.router; + } + + /** + * Get the mount table store for this resolver. + * + * @return Mount table store. + * @throws IOException If it cannot connect to the State Store. + */ + protected MountTableStore getMountTableStore() throws IOException { + if (this.mountTableStore == null) { + this.mountTableStore = this.stateStore.getRegisteredRecordStore( + MountTableStore.class); + if (this.mountTableStore == null) { + throw new IOException("State Store does not have an interface for " + + MountTableStore.class); + } + } + return this.mountTableStore; + } + + /** + * Add a mount entry to the table. + * + * @param entry The mount table record to add from the state store. + */ + public void addEntry(final MountTable entry) { + writeLock.lock(); + try { + String srcPath = entry.getSourcePath(); + this.tree.put(srcPath, entry); + invalidateLocationCache(srcPath); + } finally { + writeLock.unlock(); + } + this.init = true; + } + + /** + * Remove a mount table entry. + * + * @param srcPath Source path for the entry to remove. + */ + public void removeEntry(final String srcPath) { + writeLock.lock(); + try { + this.tree.remove(srcPath); + invalidateLocationCache(srcPath); + } finally { + writeLock.unlock(); + } + } + + /** + * Invalidates all cache entries below this path. It requires the write lock. + * + * @param path Source path. + */ + private void invalidateLocationCache(final String path) { + LOG.debug("Invalidating {} from {}", path, locationCache); + if (locationCache.size() == 0) { + return; + } + + // Go through the entries and remove the ones from the path to invalidate + ConcurrentMap map = locationCache.asMap(); + Set> entries = map.entrySet(); + Iterator> it = entries.iterator(); + while (it.hasNext()) { + Entry entry = it.next(); + PathLocation loc = entry.getValue(); + String src = loc.getSourcePath(); + if (src != null) { + if (isParentEntry(src, path)) { + LOG.debug("Removing {}", src); + it.remove(); + } + } else { + String dest = loc.getDefaultLocation().getDest(); + if (dest.startsWith(path)) { + LOG.debug("Removing default cache {}", dest); + it.remove(); + } + } + } + + LOG.debug("Location cache after invalidation: {}", locationCache); + } + + /** + * Updates the mount path tree with a new set of mount table entries. It also + * updates the needed caches. + * + * @param entries Full set of mount table entries to update. + */ + @VisibleForTesting + public void refreshEntries(final Collection entries) { + // The tree read/write must be atomic + writeLock.lock(); + try { + // New entries + Map newEntries = new ConcurrentHashMap<>(); + for (MountTable entry : entries) { + String srcPath = entry.getSourcePath(); + newEntries.put(srcPath, entry); + } + + // Old entries (reversed to sort from the leaves to the root) + Set oldEntries = new TreeSet<>(Collections.reverseOrder()); + for (MountTable entry : getTreeValues("/")) { + String srcPath = entry.getSourcePath(); + oldEntries.add(srcPath); + } + + // Entries that need to be removed + for (String srcPath : oldEntries) { + if (!newEntries.containsKey(srcPath)) { + this.tree.remove(srcPath); + invalidateLocationCache(srcPath); + LOG.info("Removed stale mount point {} from resolver", srcPath); + } + } + + // Entries that need to be added + for (MountTable entry : entries) { + String srcPath = entry.getSourcePath(); + if (!oldEntries.contains(srcPath)) { + // Add node, it does not exist + this.tree.put(srcPath, entry); + invalidateLocationCache(srcPath); + LOG.info("Added new mount point {} to resolver", srcPath); + } else { + // Node exists, check for updates + MountTable existingEntry = this.tree.get(srcPath); + if (existingEntry != null && !existingEntry.equals(entry)) { + LOG.info("Entry has changed from \"{}\" to \"{}\"", + existingEntry, entry); + this.tree.put(srcPath, entry); + invalidateLocationCache(srcPath); + LOG.info("Updated mount point {} in resolver", srcPath); + } + } + } + } finally { + writeLock.unlock(); + } + this.init = true; + } + + /** + * Replaces the current in-memory cached of the mount table with a new + * version fetched from the data store. + */ + @Override + public boolean loadCache(boolean force) { + try { + // Our cache depends on the store, update it first + MountTableStore mountTable = this.getMountTableStore(); + mountTable.loadCache(force); + + GetMountTableEntriesRequest request = + GetMountTableEntriesRequest.newInstance("/"); + GetMountTableEntriesResponse response = + mountTable.getMountTableEntries(request); + List records = response.getEntries(); + refreshEntries(records); + } catch (IOException e) { + LOG.error("Cannot fetch mount table entries from State Store", e); + return false; + } + return true; + } + + /** + * Clears all data. + */ + public void clear() { + LOG.info("Clearing all mount location caches"); + writeLock.lock(); + try { + this.locationCache.invalidateAll(); + this.tree.clear(); + } finally { + writeLock.unlock(); + } + } + + @Override + public PathLocation getDestinationForPath(final String path) + throws IOException { + verifyMountTable(); + readLock.lock(); + try { + Callable meh = new Callable() { + @Override + public PathLocation call() throws Exception { + return lookupLocation(path); + } + }; + return this.locationCache.get(path, meh); + } catch (ExecutionException e) { + throw new IOException(e); + } finally { + readLock.unlock(); + } + } + + /** + * Build the path location to insert into the cache atomically. It must hold + * the read lock. + * @param path Path to check/insert. + * @return New remote location. + */ + public PathLocation lookupLocation(final String path) { + PathLocation ret = null; + MountTable entry = findDeepest(path); + if (entry != null) { + ret = buildLocation(path, entry); + } else { + // Not found, use default location + RemoteLocation remoteLocation = + new RemoteLocation(defaultNameService, path); + List locations = + Collections.singletonList(remoteLocation); + ret = new PathLocation(null, locations); + } + return ret; + } + + /** + * Get the mount table entry for a path. + * + * @param path Path to look for. + * @return Mount table entry the path belongs. + * @throws IOException If the State Store could not be reached. + */ + public MountTable getMountPoint(final String path) throws IOException { + verifyMountTable(); + return findDeepest(path); + } + + @Override + public List getMountPoints(final String path) throws IOException { + verifyMountTable(); + + Set children = new TreeSet<>(); + readLock.lock(); + try { + String from = path; + String to = path + Character.MAX_VALUE; + SortedMap subMap = this.tree.subMap(from, to); + + boolean exists = false; + for (String subPath : subMap.keySet()) { + String child = subPath; + + // Special case for / + if (!path.equals(Path.SEPARATOR)) { + // Get the children + int ini = path.length(); + child = subPath.substring(ini); + } + + if (child.isEmpty()) { + // This is a mount point but without children + exists = true; + } else if (child.startsWith(Path.SEPARATOR)) { + // This is a mount point with children + exists = true; + child = child.substring(1); + + // We only return immediate children + int fin = child.indexOf(Path.SEPARATOR); + if (fin > -1) { + child = child.substring(0, fin); + } + if (!child.isEmpty()) { + children.add(child); + } + } + } + if (!exists) { + return null; + } + return new LinkedList<>(children); + } finally { + readLock.unlock(); + } + } + + /** + * Get all the mount records at or beneath a given path. + * @param path Path to get the mount points from. + * @return List of mount table records under the path or null if the path is + * not found. + * @throws IOException If it's not connected to the State Store. + */ + public List getMounts(final String path) throws IOException { + verifyMountTable(); + + return getTreeValues(path, false); + } + + /** + * Check if the Mount Table is ready to be used. + * @throws StateStoreUnavailableException If it cannot connect to the store. + */ + private void verifyMountTable() throws StateStoreUnavailableException { + if (!this.init) { + throw new StateStoreUnavailableException("Mount Table not initialized"); + } + } + + @Override + public String toString() { + readLock.lock(); + try { + return this.tree.toString(); + } finally { + readLock.unlock(); + } + } + + /** + * Build a location for this result beneath the discovered mount point. + * + * @param path Path to build for. + * @param entry Mount table entry. + * @return PathLocation containing the namespace, local path. + */ + private static PathLocation buildLocation( + final String path, final MountTable entry) { + + String srcPath = entry.getSourcePath(); + if (!path.startsWith(srcPath)) { + LOG.error("Cannot build location, {} not a child of {}", path, srcPath); + return null; + } + String remainingPath = path.substring(srcPath.length()); + if (remainingPath.startsWith(Path.SEPARATOR)) { + remainingPath = remainingPath.substring(1); + } + + List locations = new LinkedList<>(); + for (RemoteLocation oneDst : entry.getDestinations()) { + String nsId = oneDst.getNameserviceId(); + String dest = oneDst.getDest(); + String newPath = dest; + if (!newPath.endsWith(Path.SEPARATOR) && !remainingPath.isEmpty()) { + newPath += Path.SEPARATOR; + } + newPath += remainingPath; + RemoteLocation remoteLocation = new RemoteLocation(nsId, newPath); + locations.add(remoteLocation); + } + DestinationOrder order = entry.getDestOrder(); + return new PathLocation(srcPath, locations, order); + } + + @Override + public String getDefaultNamespace() { + return this.defaultNameService; + } + + /** + * Find the deepest mount point for a path. + * @param path Path to look for. + * @return Mount table entry. + */ + private MountTable findDeepest(final String path) { + readLock.lock(); + try { + Entry entry = this.tree.floorEntry(path); + while (entry != null && !isParentEntry(path, entry.getKey())) { + entry = this.tree.lowerEntry(entry.getKey()); + } + if (entry == null) { + return null; + } + return entry.getValue(); + } finally { + readLock.unlock(); + } + } + + /** + * Get the mount table entries under a path. + * @param path Path to search from. + * @return Mount Table entries. + */ + private List getTreeValues(final String path) { + return getTreeValues(path, false); + } + + /** + * Get the mount table entries under a path. + * @param path Path to search from. + * @param reverse If the order should be reversed. + * @return Mount Table entries. + */ + private List getTreeValues(final String path, boolean reverse) { + LinkedList ret = new LinkedList<>(); + readLock.lock(); + try { + String from = path; + String to = path + Character.MAX_VALUE; + SortedMap subMap = this.tree.subMap(from, to); + for (MountTable entry : subMap.values()) { + if (!reverse) { + ret.add(entry); + } else { + ret.addFirst(entry); + } + } + } finally { + readLock.unlock(); + } + return ret; + } + + /** + * Get the size of the cache. + * @return Size of the cache. + */ + protected long getCacheSize() { + return this.locationCache.size(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java new file mode 100644 index 0000000..8dd73ec --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import java.io.IOException; +import java.util.EnumMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.hdfs.server.federation.resolver.order.HashFirstResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.order.HashResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.order.LocalResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.order.OrderedResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.order.RandomResolver; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Mount table resolver that supports multiple locations for each mount entry. + * The returned location contains prioritized remote paths from highest priority + * to the lowest priority. Multiple locations for a mount point are optional. + * When multiple locations are specified, both will be checked for the presence + * of a file and the nameservice for a new file/dir is chosen based on the + * results of a consistent hashing algorithm. + *

+ * Does the Mount table entry for this path have multiple destinations? + *

    + *
  • No -> Return the location + *
  • Yes -> Return all locations, prioritizing the best guess from the + * consistent hashing algorithm. + *
+ *

+ * It has multiple options to order the locations: HASH (default), LOCAL, + * RANDOM, and HASH_ALL. + *

+ * The consistent hashing result is dependent on the number and combination of + * nameservices that are registered for particular mount point. The order of + * nameservices/locations in the mount table is not prioritized. Each consistent + * hash calculation considers only the set of unique nameservices present for + * the mount table location. + */ +public class MultipleDestinationMountTableResolver extends MountTableResolver { + + private static final Logger LOG = + LoggerFactory.getLogger(MultipleDestinationMountTableResolver.class); + + + /** Resolvers that use a particular order for multiple destinations. */ + private EnumMap orderedResolvers = + new EnumMap<>(DestinationOrder.class); + + + public MultipleDestinationMountTableResolver( + Configuration conf, Router router) { + super(conf, router); + + // Initialize the ordered resolvers + addResolver(DestinationOrder.HASH, new HashFirstResolver()); + addResolver(DestinationOrder.LOCAL, new LocalResolver(conf, router)); + addResolver(DestinationOrder.RANDOM, new RandomResolver()); + addResolver(DestinationOrder.HASH_ALL, new HashResolver()); + } + + @Override + public PathLocation getDestinationForPath(String path) throws IOException { + PathLocation mountTableResult = super.getDestinationForPath(path); + if (mountTableResult == null) { + LOG.error("The {} cannot find a location for {}", + super.getClass().getSimpleName(), path); + } else if (mountTableResult.hasMultipleDestinations()) { + DestinationOrder order = mountTableResult.getDestinationOrder(); + OrderedResolver orderedResolver = orderedResolvers.get(order); + if (orderedResolver == null) { + LOG.error("Cannot find resolver for order {}", order); + } else { + String firstNamespace = + orderedResolver.getFirstNamespace(path, mountTableResult); + + // Change the order of the name spaces according to the policy + if (firstNamespace != null) { + // This is the entity in the tree, we need to create our own copy + mountTableResult = new PathLocation(mountTableResult, firstNamespace); + LOG.debug("Ordered locations following {} are {}", + order, mountTableResult); + } else { + LOG.error("Cannot get main namespace for path {} with order {}", + path, order); + } + } + } + return mountTableResult; + } + + @VisibleForTesting + public void addResolver(DestinationOrder order, OrderedResolver resolver) { + orderedResolvers.put(order, resolver); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodePriorityComparator.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodePriorityComparator.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodePriorityComparator.java new file mode 100644 index 0000000..fe82f29 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodePriorityComparator.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import java.util.Comparator; + +/** + * Compares NNs in the same namespace and prioritizes by their status. The + * priorities are: + *

    + *
  • ACTIVE + *
  • STANDBY + *
  • UNAVAILABLE + *
+ * When two NNs have the same state, the last modification date is the tie + * breaker, newest has priority. Expired NNs are excluded. + */ +public class NamenodePriorityComparator + implements Comparator { + + @Override + public int compare(FederationNamenodeContext o1, + FederationNamenodeContext o2) { + FederationNamenodeServiceState state1 = o1.getState(); + FederationNamenodeServiceState state2 = o2.getState(); + + if (state1 == state2) { + // Both have the same state, use mode dates + return compareModDates(o1, o2); + } else { + // Enum is ordered by priority + return state1.compareTo(state2); + } + } + + /** + * Compare the modification dates. + * + * @param o1 Context 1. + * @param o2 Context 2. + * @return Comparison between dates. + */ + private int compareModDates(FederationNamenodeContext o1, + FederationNamenodeContext o2) { + // Reverse sort, lowest position is highest priority. + return (int) (o2.getDateModified() - o1.getDateModified()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java new file mode 100644 index 0000000..c3c6fa8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java @@ -0,0 +1,406 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; + +/** + * Status of the namenode. + */ +public class NamenodeStatusReport { + + /** Namenode information. */ + private String nameserviceId = ""; + private String namenodeId = ""; + private String clusterId = ""; + private String blockPoolId = ""; + private String rpcAddress = ""; + private String serviceAddress = ""; + private String lifelineAddress = ""; + private String webAddress = ""; + + /** Namenode state. */ + private HAServiceState status = HAServiceState.STANDBY; + private boolean safeMode = false; + + /** Datanodes stats. */ + private int liveDatanodes = -1; + private int deadDatanodes = -1; + /** Decommissioning datanodes. */ + private int decomDatanodes = -1; + /** Live decommissioned datanodes. */ + private int liveDecomDatanodes = -1; + /** Dead decommissioned datanodes. */ + private int deadDecomDatanodes = -1; + + /** Space stats. */ + private long availableSpace = -1; + private long numOfFiles = -1; + private long numOfBlocks = -1; + private long numOfBlocksMissing = -1; + private long numOfBlocksPendingReplication = -1; + private long numOfBlocksUnderReplicated = -1; + private long numOfBlocksPendingDeletion = -1; + private long totalSpace = -1; + private long providedSpace = -1; + + /** If the fields are valid. */ + private boolean registrationValid = false; + private boolean statsValid = false; + private boolean haStateValid = false; + + public NamenodeStatusReport(String ns, String nn, String rpc, String service, + String lifeline, String web) { + this.nameserviceId = ns; + this.namenodeId = nn; + this.rpcAddress = rpc; + this.serviceAddress = service; + this.lifelineAddress = lifeline; + this.webAddress = web; + } + + /** + * If the statistics are valid. + * + * @return If the statistics are valid. + */ + public boolean statsValid() { + return this.statsValid; + } + + /** + * If the registration is valid. + * + * @return If the registration is valid. + */ + public boolean registrationValid() { + return this.registrationValid; + } + + /** + * If the HA state is valid. + * + * @return If the HA state is valid. + */ + public boolean haStateValid() { + return this.haStateValid; + } + + /** + * Get the state of the Namenode being monitored. + * + * @return State of the Namenode. + */ + public FederationNamenodeServiceState getState() { + if (!registrationValid) { + return FederationNamenodeServiceState.UNAVAILABLE; + } else if (haStateValid) { + return FederationNamenodeServiceState.getState(status); + } else { + return FederationNamenodeServiceState.ACTIVE; + } + } + + /** + * Get the name service identifier. + * + * @return The name service identifier. + */ + public String getNameserviceId() { + return this.nameserviceId; + } + + /** + * Get the namenode identifier. + * + * @return The namenode identifier. + */ + public String getNamenodeId() { + return this.namenodeId; + } + + /** + * Get the cluster identifier. + * + * @return The cluster identifier. + */ + public String getClusterId() { + return this.clusterId; + } + + /** + * Get the block pool identifier. + * + * @return The block pool identifier. + */ + public String getBlockPoolId() { + return this.blockPoolId; + } + + /** + * Get the RPC address. + * + * @return The RPC address. + */ + public String getRpcAddress() { + return this.rpcAddress; + } + + /** + * Get the Service RPC address. + * + * @return The Service RPC address. + */ + public String getServiceAddress() { + return this.serviceAddress; + } + + /** + * Get the Lifeline RPC address. + * + * @return The Lifeline RPC address. + */ + public String getLifelineAddress() { + return this.lifelineAddress; + } + + /** + * Get the web address. + * + * @return The web address. + */ + public String getWebAddress() { + return this.webAddress; + } + + /** + * Get the HA service state. + * + * @return The HA service state. + */ + public void setHAServiceState(HAServiceState state) { + this.status = state; + this.haStateValid = true; + } + + /** + * Set the namespace information. + * + * @param info Namespace information. + */ + public void setNamespaceInfo(NamespaceInfo info) { + this.clusterId = info.getClusterID(); + this.blockPoolId = info.getBlockPoolID(); + this.registrationValid = true; + } + + public void setSafeMode(boolean safemode) { + this.safeMode = safemode; + } + + public boolean getSafemode() { + return this.safeMode; + } + + /** + * Set the datanode information. + * + * @param numLive Number of live nodes. + * @param numDead Number of dead nodes. + * @param numDecom Number of decommissioning nodes. + * @param numLiveDecom Number of decommissioned live nodes. + * @param numDeadDecom Number of decommissioned dead nodes. + */ + public void setDatanodeInfo(int numLive, int numDead, int numDecom, + int numLiveDecom, int numDeadDecom) { + this.liveDatanodes = numLive; + this.deadDatanodes = numDead; + this.decomDatanodes = numDecom; + this.liveDecomDatanodes = numLiveDecom; + this.deadDecomDatanodes = numDeadDecom; + this.statsValid = true; + } + + /** + * Get the number of live blocks. + * + * @return The number of dead nodes. + */ + public int getNumLiveDatanodes() { + return this.liveDatanodes; + } + + /** + * Get the number of dead blocks. + * + * @return The number of dead nodes. + */ + public int getNumDeadDatanodes() { + return this.deadDatanodes; + } + + /** + * Get the number of decommissionining nodes. + * + * @return The number of decommissionining nodes. + */ + public int getNumDecommissioningDatanodes() { + return this.decomDatanodes; + } + + /** + * Get the number of live decommissioned nodes. + * + * @return The number of live decommissioned nodes. + */ + public int getNumDecomLiveDatanodes() { + return this.liveDecomDatanodes; + } + + /** + * Get the number of dead decommissioned nodes. + * + * @return The number of dead decommissioned nodes. + */ + public int getNumDecomDeadDatanodes() { + return this.deadDecomDatanodes; + } + + /** + * Set the filesystem information. + * + * @param available Available capacity. + * @param total Total capacity. + * @param numFiles Number of files. + * @param numBlocks Total number of blocks. + * @param numBlocksMissing Number of missing blocks. + * @param numBlocksPendingReplication Number of blocks pending replication. + * @param numBlocksUnderReplicated Number of blocks under replication. + * @param numBlocksPendingDeletion Number of blocks pending deletion. + */ + public void setNamesystemInfo(long available, long total, + long numFiles, long numBlocks, long numBlocksMissing, + long numBlocksPendingReplication, long numBlocksUnderReplicated, + long numBlocksPendingDeletion, long providedSpace) { + this.totalSpace = total; + this.availableSpace = available; + this.numOfBlocks = numBlocks; + this.numOfBlocksMissing = numBlocksMissing; + this.numOfBlocksPendingReplication = numBlocksPendingReplication; + this.numOfBlocksUnderReplicated = numBlocksUnderReplicated; + this.numOfBlocksPendingDeletion = numBlocksPendingDeletion; + this.numOfFiles = numFiles; + this.statsValid = true; + this.providedSpace = providedSpace; + } + + /** + * Get the number of blocks. + * + * @return The number of blocks. + */ + public long getNumBlocks() { + return this.numOfBlocks; + } + + /** + * Get the number of files. + * + * @return The number of files. + */ + public long getNumFiles() { + return this.numOfFiles; + } + + /** + * Get the total space. + * + * @return The total space. + */ + public long getTotalSpace() { + return this.totalSpace; + } + + /** + * Get the available space. + * + * @return The available space. + */ + public long getAvailableSpace() { + return this.availableSpace; + } + + /** + * Get the space occupied by provided storage. + * + * @return the provided capacity. + */ + public long getProvidedSpace() { + return this.providedSpace; + } + /** + * Get the number of missing blocks. + * + * @return Number of missing blocks. + */ + public long getNumBlocksMissing() { + return this.numOfBlocksMissing; + } + + /** + * Get the number of pending replication blocks. + * + * @return Number of pending replication blocks. + */ + public long getNumOfBlocksPendingReplication() { + return this.numOfBlocksPendingReplication; + } + + /** + * Get the number of under replicated blocks. + * + * @return Number of under replicated blocks. + */ + public long getNumOfBlocksUnderReplicated() { + return this.numOfBlocksUnderReplicated; + } + + /** + * Get the number of pending deletion blocks. + * + * @return Number of pending deletion blocks. + */ + public long getNumOfBlocksPendingDeletion() { + return this.numOfBlocksPendingDeletion; + } + + /** + * Set the validity of registration. + * @param isValid The desired value to be set. + */ + public void setRegistrationValid(boolean isValid) { + this.registrationValid = isValid; + } + + @Override + public String toString() { + return String.format("%s-%s:%s", + nameserviceId, namenodeId, serviceAddress); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/PathLocation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/PathLocation.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/PathLocation.java new file mode 100644 index 0000000..945d81d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/PathLocation.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A map of the properties and target destinations (name space + path) for + * a path in the global/federated name space. + * This data is generated from the @see MountTable records. + */ +public class PathLocation { + + private static final Logger LOG = LoggerFactory.getLogger(PathLocation.class); + + + /** Source path in global namespace. */ + private final String sourcePath; + + /** Remote paths in the target name spaces. */ + private final List destinations; + /** Order for the destinations. */ + private final DestinationOrder destOrder; + + + /** + * Create a new PathLocation. + * + * @param source Source path in the global name space. + * @param dest Destinations of the mount table entry. + * @param order Order of the locations. + */ + public PathLocation( + String source, List dest, DestinationOrder order) { + this.sourcePath = source; + this.destinations = Collections.unmodifiableList(dest); + this.destOrder = order; + } + + /** + * Create a new PathLocation with default HASH order. + * + * @param source Source path in the global name space. + * @param dest Destinations of the mount table entry. + */ + public PathLocation(String source, List dest) { + this(source, dest, DestinationOrder.HASH); + } + + /** + * Create a path location from another path. + * + * @param other Other path location to copy from. + */ + public PathLocation(final PathLocation other) { + this.sourcePath = other.sourcePath; + this.destinations = Collections.unmodifiableList(other.destinations); + this.destOrder = other.destOrder; + } + + /** + * Create a path location from another path with the destinations sorted. + * + * @param other Other path location to copy from. + * @param firstNsId Identifier of the namespace to place first. + */ + public PathLocation(PathLocation other, String firstNsId) { + this.sourcePath = other.sourcePath; + this.destOrder = other.destOrder; + this.destinations = orderedNamespaces(other.destinations, firstNsId); + } + + /** + * Prioritize a location/destination by its name space/nameserviceId. + * This destination might be used by other threads, so the source is not + * modifiable. + * + * @param original List of destinations to order. + * @param nsId The name space/nameserviceID to prioritize. + * @return Prioritized list of detinations that cannot be modified. + */ + private static List orderedNamespaces( + final List original, final String nsId) { + if (original.size() <= 1) { + return original; + } + + LinkedList newDestinations = new LinkedList<>(); + boolean found = false; + for (RemoteLocation dest : original) { + if (dest.getNameserviceId().equals(nsId)) { + found = true; + newDestinations.addFirst(dest); + } else { + newDestinations.add(dest); + } + } + + if (!found) { + LOG.debug("Cannot find location with namespace {} in {}", + nsId, original); + } + return Collections.unmodifiableList(newDestinations); + } + + /** + * Get the source path in the global namespace for this path location. + * + * @return The path in the global namespace. + */ + public String getSourcePath() { + return this.sourcePath; + } + + /** + * Get the subclusters defined for the destinations. + * + * @return Set containing the subclusters. + */ + public Set getNamespaces() { + Set namespaces = new HashSet<>(); + List locations = this.getDestinations(); + for (RemoteLocation location : locations) { + String nsId = location.getNameserviceId(); + namespaces.add(nsId); + } + return namespaces; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for (RemoteLocation destination : this.destinations) { + String nsId = destination.getNameserviceId(); + String path = destination.getDest(); + if (sb.length() > 0) { + sb.append(","); + } + sb.append(nsId + "->" + path); + } + if (this.destinations.size() > 1) { + sb.append(" ["); + sb.append(this.destOrder.toString()); + sb.append("]"); + } + return sb.toString(); + } + + /** + * Check if this location supports multiple clusters/paths. + * + * @return If it has multiple destinations. + */ + public boolean hasMultipleDestinations() { + return this.destinations.size() > 1; + } + + /** + * Get the list of locations found in the mount table. + * The first result is the highest priority path. + * + * @return List of remote locations. + */ + public List getDestinations() { + return Collections.unmodifiableList(this.destinations); + } + + /** + * Get the order for the destinations. + * + * @return Order for the destinations. + */ + public DestinationOrder getDestinationOrder() { + return this.destOrder; + } + + /** + * Get the default or highest priority location. + * + * @return The default location. + */ + public RemoteLocation getDefaultLocation() { + if (destinations.isEmpty() || destinations.get(0).getDest() == null) { + throw new UnsupportedOperationException( + "Unsupported path " + sourcePath + " please check mount table"); + } + return destinations.get(0); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java new file mode 100644 index 0000000..6aa12ce --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext; + +/** + * A single in a remote namespace consisting of a nameservice ID + * and a HDFS path. + */ +public class RemoteLocation extends RemoteLocationContext { + + /** Identifier of the remote namespace for this location. */ + private final String nameserviceId; + /** Identifier of the namenode in the namespace for this location. */ + private final String namenodeId; + /** Path in the remote location. */ + private final String path; + + /** + * Create a new remote location. + * + * @param nsId + * @param pPath + */ + public RemoteLocation(String nsId, String pPath) { + this(nsId, null, pPath); + } + + /** + * Create a new remote location pointing to a particular namenode in the + * namespace. + * + * @param nsId Destination namespace. + * @param pPath Path in the destination namespace. + */ + public RemoteLocation(String nsId, String nnId, String pPath) { + this.nameserviceId = nsId; + this.namenodeId = nnId; + this.path = pPath; + } + + @Override + public String getNameserviceId() { + String ret = this.nameserviceId; + if (this.namenodeId != null) { + ret += "-" + this.namenodeId; + } + return ret; + } + + @Override + public String getDest() { + return this.path; + } + + @Override + public String toString() { + return getNameserviceId() + "->" + this.path; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java new file mode 100644 index 0000000..03e68e5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver.order; + +/** + * Order of the destinations when we have multiple of them. When the resolver + * of files to subclusters (FileSubclusterResolver) has multiple destinations, + * this determines which location should be checked first. + */ +public enum DestinationOrder { + HASH, // Follow consistent hashing in the first folder level + LOCAL, // Local first + RANDOM, // Random order + HASH_ALL // Follow consistent hashing +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashFirstResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashFirstResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashFirstResolver.java new file mode 100644 index 0000000..831b082 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashFirstResolver.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver.order; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; + +/** + * Variation of HashResolver that only uses the first level of the path. + */ +public class HashFirstResolver extends HashResolver { + + @Override + public String getFirstNamespace(final String path, final PathLocation loc) { + String srcPath = loc.getSourcePath(); + String trimmedPath = trimPathToChild(path, srcPath); + LOG.debug("Only using the first part of the path: {} -> {}", + path, trimmedPath); + return super.getFirstNamespace(trimmedPath, loc); + } + + /** + * Hash only up to the immediate child of the mount point. This prevents the + * need to create/maintain subtrees under each multi-destination mount point. + * Each child of a multi-destination mount is mapped to only one hash + * location. + *

+ * Trims a path to at most the immediate child of a parent path. For example: + *

    + *
  • path = /a/b/c, parent = /a will be trimmed to /a/b. + *
  • path = /a/b, parent = /a/b will be trimmed to /a/b + *
+ * + * @param path The path to trim. + * @param parent The parent used to find the immediate child. + * @return Trimmed path. + */ + private static String trimPathToChild(String path, String parent) { + // Path is invalid or equal to the parent + if (path.length() <= parent.length()) { + return parent; + } + String remainder = path.substring(parent.length()); + String[] components = + remainder.replaceFirst("^/", "").split(Path.SEPARATOR); + if (components.length > 0 && components[0].length() > 0) { + if (parent.endsWith(Path.SEPARATOR)) { + return parent + components[0]; + } else { + return parent + Path.SEPARATOR + components[0]; + } + } else { + return parent; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashResolver.java new file mode 100644 index 0000000..4034a46 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashResolver.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver.order; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; +import org.apache.hadoop.hdfs.server.federation.utils.ConsistentHashRing; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Order the destinations based on consistent hashing. + */ +public class HashResolver implements OrderedResolver { + + protected static final Logger LOG = + LoggerFactory.getLogger(HashResolver.class); + + + /** Namespace set hash -> Locator. */ + private final Map hashResolverMap; + + /** Patterns for temporary files. */ + private static final String HEX_PATTERN = "\\p{XDigit}"; + private static final String UUID_PATTERN = HEX_PATTERN + "{8}-" + + HEX_PATTERN + "{4}-" + HEX_PATTERN + "{4}-" + HEX_PATTERN + "{4}-" + + HEX_PATTERN + "{12}"; + private static final String ATTEMPT_PATTERN = + "attempt_\\d+_\\d{4}_._\\d{6}_\\d{2}"; + private static final String[] TEMP_FILE_PATTERNS = { + "(.+)\\.COPYING$", + "(.+)\\._COPYING_.*$", + "(.+)\\.tmp$", + "_temp/(.+)$", + "_temporary/(.+)\\." + UUID_PATTERN + "$", + "(.*)_temporary/\\d/_temporary/" + ATTEMPT_PATTERN + "/(.+)$" }; + /** Pattern for temporary files (or of the individual patterns). */ + private static final Pattern TEMP_FILE_PATTERN = + Pattern.compile(StringUtils.join("|", TEMP_FILE_PATTERNS)); + + + public HashResolver() { + this.hashResolverMap = new ConcurrentHashMap<>(); + } + + /** + * Use the result from consistent hashing locator to prioritize the locations + * for a path. + * + * @param path Path to check. + * @param loc Federated location with multiple destinations. + * @return First namespace based on hash. + */ + @Override + public String getFirstNamespace(final String path, final PathLocation loc) { + String finalPath = extractTempFileName(path); + Set namespaces = loc.getNamespaces(); + ConsistentHashRing locator = getHashResolver(namespaces); + String hashedSubcluster = locator.getLocation(finalPath); + if (hashedSubcluster == null) { + String srcPath = loc.getSourcePath(); + LOG.error("Cannot find subcluster for {} ({} -> {})", + srcPath, path, finalPath); + } + LOG.debug("Namespace for {} ({}) is {}", path, finalPath, hashedSubcluster); + return hashedSubcluster; + } + + /** + * Get the cached (if available) or generate a new hash resolver for this + * particular set of unique namespace identifiers. + * + * @param namespaces A set of unique namespace identifiers. + * @return A hash resolver configured to consistently resolve paths to + * namespaces using the provided set of namespace identifiers. + */ + private ConsistentHashRing getHashResolver(final Set namespaces) { + int hash = namespaces.hashCode(); + ConsistentHashRing resolver = this.hashResolverMap.get(hash); + if (resolver == null) { + resolver = new ConsistentHashRing(namespaces); + this.hashResolverMap.put(hash, resolver); + } + return resolver; + } + + /** + * Some files use a temporary naming pattern. Extract the final name from the + * temporary name. For example, files *._COPYING_ will be renamed, so we + * remove that chunk. + * + * @param input Input string. + * @return Final file name. + */ + @VisibleForTesting + public static String extractTempFileName(final String input) { + StringBuilder sb = new StringBuilder(); + Matcher matcher = TEMP_FILE_PATTERN.matcher(input); + if (matcher.find()) { + for (int i=1; i <= matcher.groupCount(); i++) { + String match = matcher.group(i); + if (match != null) { + sb.append(match); + } + } + } + if (sb.length() > 0) { + String ret = sb.toString(); + LOG.debug("Extracted {} from {}", ret, input); + return ret; + } + return input; + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org