Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6EA42200D20 for ; Tue, 3 Oct 2017 03:35:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6CE46160BD7; Tue, 3 Oct 2017 01:35:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 17E2D1609EF for ; Tue, 3 Oct 2017 03:35:19 +0200 (CEST) Received: (qmail 18226 invoked by uid 500); 3 Oct 2017 01:35:02 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 17207 invoked by uid 99); 3 Oct 2017 01:35:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Oct 2017 01:35:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 13703F5C3C; Tue, 3 Oct 2017 01:34:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: inigoiri@apache.org To: common-commits@hadoop.apache.org Date: Tue, 03 Oct 2017 01:35:02 -0000 Message-Id: <020b1e5a59a646dbb140b2ea5b70e81c@git.apache.org> In-Reply-To: <42bd84e35918485fb08b5bc36f0f047d@git.apache.org> References: <42bd84e35918485fb08b5bc36f0f047d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/29] hadoop git commit: HDFS-10687. Federation Membership State Store internal API. Contributed by Jason Kace and Inigo Goiri. archived-at: Tue, 03 Oct 2017 01:35:22 -0000 HDFS-10687. Federation Membership State Store internal API. Contributed by Jason Kace and Inigo Goiri. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a67299a6 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a67299a6 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a67299a6 Branch: refs/heads/HDFS-10467 Commit: a67299a6ab90bb16a521d08753b4f2942b777cff Parents: 317af56 Author: Inigo Goiri Authored: Mon Jul 31 10:55:21 2017 -0700 Committer: Inigo Goiri Committed: Mon Oct 2 18:33:14 2017 -0700 ---------------------------------------------------------------------- .../dev-support/findbugsExcludeFile.xml | 3 + hadoop-hdfs-project/hadoop-hdfs/pom.xml | 1 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 17 +- .../resolver/MembershipNamenodeResolver.java | 290 ++++++++++++ .../federation/router/FederationUtil.java | 42 +- .../federation/store/CachedRecordStore.java | 237 ++++++++++ .../federation/store/MembershipStore.java | 126 +++++ .../federation/store/StateStoreCache.java | 36 ++ .../store/StateStoreCacheUpdateService.java | 67 +++ .../federation/store/StateStoreService.java | 202 +++++++- .../store/impl/MembershipStoreImpl.java | 311 +++++++++++++ .../federation/store/impl/package-info.java | 31 ++ .../GetNamenodeRegistrationsRequest.java | 52 +++ .../GetNamenodeRegistrationsResponse.java | 55 +++ .../store/protocol/GetNamespaceInfoRequest.java | 30 ++ .../protocol/GetNamespaceInfoResponse.java | 52 +++ .../protocol/NamenodeHeartbeatRequest.java | 52 +++ .../protocol/NamenodeHeartbeatResponse.java | 49 ++ .../UpdateNamenodeRegistrationRequest.java | 72 +++ .../UpdateNamenodeRegistrationResponse.java | 51 ++ .../impl/pb/FederationProtocolPBTranslator.java | 145 ++++++ .../GetNamenodeRegistrationsRequestPBImpl.java | 87 ++++ .../GetNamenodeRegistrationsResponsePBImpl.java | 99 ++++ .../impl/pb/GetNamespaceInfoRequestPBImpl.java | 60 +++ .../impl/pb/GetNamespaceInfoResponsePBImpl.java | 95 ++++ .../impl/pb/NamenodeHeartbeatRequestPBImpl.java | 93 ++++ .../pb/NamenodeHeartbeatResponsePBImpl.java | 71 +++ ...UpdateNamenodeRegistrationRequestPBImpl.java | 95 ++++ ...pdateNamenodeRegistrationResponsePBImpl.java | 73 +++ .../store/protocol/impl/pb/package-info.java | 29 ++ .../store/records/MembershipState.java | 329 +++++++++++++ .../store/records/MembershipStats.java | 126 +++++ .../records/impl/pb/MembershipStatePBImpl.java | 334 +++++++++++++ .../records/impl/pb/MembershipStatsPBImpl.java | 191 ++++++++ .../src/main/proto/FederationProtocol.proto | 107 +++++ .../src/main/resources/hdfs-default.xml | 18 +- .../resolver/TestNamenodeResolver.java | 284 ++++++++++++ .../store/FederationStateStoreTestUtils.java | 23 +- .../federation/store/TestStateStoreBase.java | 81 ++++ .../store/TestStateStoreMembershipState.java | 463 +++++++++++++++++++ .../store/driver/TestStateStoreDriverBase.java | 69 ++- .../store/records/TestMembershipState.java | 129 ++++++ 42 files changed, 4745 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67299a6/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml index 9582fcb..4b958b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml @@ -15,6 +15,9 @@ + + + http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67299a6/hadoop-hdfs-project/hadoop-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml index 425572f..cc7a975 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml @@ -331,6 +331,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> QJournalProtocol.proto editlog.proto fsimage.proto + FederationProtocol.proto http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67299a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index c7b4c01..b50c538 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl; import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl; @@ -1166,8 +1168,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "org.apache.hadoop.hdfs.server.federation.MockResolver"; public static final String FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS = FEDERATION_ROUTER_PREFIX + "namenode.resolver.client.class"; - public static final String FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT = - "org.apache.hadoop.hdfs.server.federation.MockResolver"; + public static final Class + FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT = + MembershipNamenodeResolver.class; // HDFS Router-based federation State Store public static final String FEDERATION_STORE_PREFIX = @@ -1189,6 +1192,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT = TimeUnit.MINUTES.toMillis(1); + public static final String DFS_ROUTER_CACHE_TIME_TO_LIVE_MS = + FEDERATION_ROUTER_PREFIX + "cache.ttl"; + public static final long DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT = + TimeUnit.MINUTES.toMillis(1); + + public static final String FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS = + FEDERATION_STORE_PREFIX + "membership.expiration"; + public static final long FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT = + TimeUnit.MINUTES.toMillis(5); + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67299a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java new file mode 100644 index 0000000..b0ced24 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.ACTIVE; +import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.EXPIRED; +import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.UNAVAILABLE; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements a cached lookup of the most recently active namenode for a + * particular nameservice. Relies on the {@link StateStoreService} to + * discover available nameservices and namenodes. + */ +public class MembershipNamenodeResolver + implements ActiveNamenodeResolver, StateStoreCache { + + private static final Logger LOG = + LoggerFactory.getLogger(MembershipNamenodeResolver.class); + + /** Reference to the State Store. */ + private final StateStoreService stateStore; + /** Membership State Store interface. */ + private final MembershipStore membershipInterface; + + /** Parent router ID. */ + private String routerId; + + /** Cached lookup of NN for nameservice. Invalidated on cache refresh. */ + private Map> cacheNS; + /** Cached lookup of NN for block pool. Invalidated on cache refresh. */ + private Map> cacheBP; + + + public MembershipNamenodeResolver( + Configuration conf, StateStoreService store) throws IOException { + this.stateStore = store; + + this.cacheNS = new ConcurrentHashMap<>(); + this.cacheBP = new ConcurrentHashMap<>(); + + if (this.stateStore != null) { + // Request cache updates from the state store + this.stateStore.registerCacheExternal(this); + + // Initialize the interface to get the membership + this.membershipInterface = this.stateStore.getRegisteredRecordStore( + MembershipStore.class); + } else { + this.membershipInterface = null; + } + + if (this.membershipInterface == null) { + throw new IOException("State Store does not have an interface for " + + MembershipStore.class.getSimpleName()); + } + } + + @Override + public boolean loadCache(boolean force) { + // Our cache depends on the store, update it first + try { + this.membershipInterface.loadCache(force); + } catch (IOException e) { + LOG.error("Cannot update membership from the State Store", e); + } + + // Force refresh of active NN cache + cacheBP.clear(); + cacheNS.clear(); + return true; + } + + @Override + public void updateActiveNamenode( + final String nsId, final InetSocketAddress address) throws IOException { + + // Called when we have an RPC miss and successful hit on an alternate NN. + // Temporarily update our cache, it will be overwritten on the next update. + try { + MembershipState partial = MembershipState.newInstance(); + String rpcAddress = address.getHostName() + ":" + address.getPort(); + partial.setRpcAddress(rpcAddress); + partial.setNameserviceId(nsId); + + GetNamenodeRegistrationsRequest request = + GetNamenodeRegistrationsRequest.newInstance(partial); + + GetNamenodeRegistrationsResponse response = + this.membershipInterface.getNamenodeRegistrations(request); + List records = response.getNamenodeMemberships(); + + if (records != null && records.size() == 1) { + MembershipState record = records.get(0); + UpdateNamenodeRegistrationRequest updateRequest = + UpdateNamenodeRegistrationRequest.newInstance( + record.getNameserviceId(), record.getNamenodeId(), ACTIVE); + this.membershipInterface.updateNamenodeRegistration(updateRequest); + } + } catch (StateStoreUnavailableException e) { + LOG.error("Cannot update {} as active, State Store unavailable", address); + } + } + + @Override + public List getNamenodesForNameserviceId( + final String nsId) throws IOException { + + List ret = cacheNS.get(nsId); + if (ret == null) { + try { + MembershipState partial = MembershipState.newInstance(); + partial.setNameserviceId(nsId); + GetNamenodeRegistrationsRequest request = + GetNamenodeRegistrationsRequest.newInstance(partial); + + final List result = + getRecentRegistrationForQuery(request, true, false); + if (result == null || result.isEmpty()) { + LOG.error("Cannot locate eligible NNs for {}", nsId); + return null; + } else { + cacheNS.put(nsId, result); + ret = result; + } + } catch (StateStoreUnavailableException e) { + LOG.error("Cannot get active NN for {}, State Store unavailable", nsId); + } + } + if (ret == null) { + return null; + } + return Collections.unmodifiableList(ret); + } + + @Override + public List getNamenodesForBlockPoolId( + final String bpId) throws IOException { + + List ret = cacheBP.get(bpId); + if (ret == null) { + try { + MembershipState partial = MembershipState.newInstance(); + partial.setBlockPoolId(bpId); + GetNamenodeRegistrationsRequest request = + GetNamenodeRegistrationsRequest.newInstance(partial); + + final List result = + getRecentRegistrationForQuery(request, true, false); + if (result == null || result.isEmpty()) { + LOG.error("Cannot locate eligible NNs for {}", bpId); + } else { + cacheBP.put(bpId, result); + ret = result; + } + } catch (StateStoreUnavailableException e) { + LOG.error("Cannot get active NN for {}, State Store unavailable", bpId); + return null; + } + } + if (ret == null) { + return null; + } + return Collections.unmodifiableList(ret); + } + + @Override + public boolean registerNamenode(NamenodeStatusReport report) + throws IOException { + + if (this.routerId == null) { + LOG.warn("Cannot register namenode, router ID is not known {}", report); + return false; + } + + MembershipState record = MembershipState.newInstance( + routerId, report.getNameserviceId(), report.getNamenodeId(), + report.getClusterId(), report.getBlockPoolId(), report.getRpcAddress(), + report.getServiceAddress(), report.getLifelineAddress(), + report.getWebAddress(), report.getState(), report.getSafemode()); + + if (report.getState() != UNAVAILABLE) { + // Set/update our last contact time + record.setLastContact(Time.now()); + } + + NamenodeHeartbeatRequest request = NamenodeHeartbeatRequest.newInstance(); + request.setNamenodeMembership(record); + return this.membershipInterface.namenodeHeartbeat(request).getResult(); + } + + @Override + public Set getNamespaces() throws IOException { + GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance(); + GetNamespaceInfoResponse response = + this.membershipInterface.getNamespaceInfo(request); + return response.getNamespaceInfo(); + } + + /** + * Picks the most relevant record registration that matches the query. Return + * registrations matching the query in this preference: 1) Most recently + * updated ACTIVE registration 2) Most recently updated STANDBY registration + * (if showStandby) 3) Most recently updated UNAVAILABLE registration (if + * showUnavailable). EXPIRED registrations are ignored. + * + * @param query The select query for NN registrations. + * @param excludes List of NNs to exclude from matching results. + * @param addUnavailable include UNAVAILABLE registrations. + * @param addExpired include EXPIRED registrations. + * @return List of memberships or null if no registrations that + * both match the query AND the selected states. + * @throws IOException + */ + private List getRecentRegistrationForQuery( + GetNamenodeRegistrationsRequest request, boolean addUnavailable, + boolean addExpired) throws IOException { + + // Retrieve a list of all registrations that match this query. + // This may include all NN records for a namespace/blockpool, including + // duplicate records for the same NN from different routers. + GetNamenodeRegistrationsResponse response = + this.membershipInterface.getNamenodeRegistrations(request); + + List memberships = response.getNamenodeMemberships(); + if (!addExpired || !addUnavailable) { + Iterator iterator = memberships.iterator(); + while (iterator.hasNext()) { + MembershipState membership = iterator.next(); + if (membership.getState() == EXPIRED && !addExpired) { + iterator.remove(); + } else if (membership.getState() == UNAVAILABLE && !addUnavailable) { + iterator.remove(); + } + } + } + + List priorityList = new ArrayList<>(); + priorityList.addAll(memberships); + Collections.sort(priorityList, new NamenodePriorityComparator()); + + LOG.debug("Selected most recent NN {} for query", priorityList); + return priorityList; + } + + @Override + public void setRouterId(String router) { + this.routerId = router; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67299a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java index 6e7e865..0129a37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java @@ -19,26 +19,57 @@ package org.apache.hadoop.hdfs.server.federation.router; import java.lang.reflect.Constructor; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Utilities for managing HDFS federation. */ public final class FederationUtil { - private static final Log LOG = LogFactory.getLog(FederationUtil.class); + private static final Logger LOG = + LoggerFactory.getLogger(FederationUtil.class); private FederationUtil() { // Utility Class } /** + * Create an instance of an interface with a constructor using a context. + * + * @param conf Configuration for the class names. + * @param context Context object to pass to the instance. + * @param contextClass Type of the context passed to the constructor. + * @param clazz Class of the object to return. + * @return New instance of the specified class that implements the desired + * interface and a single parameter constructor containing a + * StateStore reference. + */ + private static T newInstance(final Configuration conf, + final R context, final Class contextClass, final Class clazz) { + try { + if (contextClass == null) { + // Default constructor if no context + Constructor constructor = clazz.getConstructor(); + return constructor.newInstance(); + } else { + // Constructor with context + Constructor constructor = clazz.getConstructor( + Configuration.class, contextClass); + return constructor.newInstance(conf, context); + } + } catch (ReflectiveOperationException e) { + LOG.error("Could not instantiate: {}", clazz.getSimpleName(), e); + return null; + } + } + + /** * Create an instance of an interface with a constructor using a state store * constructor. * @@ -105,13 +136,14 @@ public final class FederationUtil { * * @param conf Configuration that defines the namenode resolver class. * @param obj Context object passed to class constructor. - * @return ActiveNamenodeResolver + * @return New active namenode resolver. */ public static ActiveNamenodeResolver newActiveNamenodeResolver( Configuration conf, StateStoreService stateStore) { - return newInstance(conf, stateStore, StateStoreService.class, + Class clazz = conf.getClass( DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT, ActiveNamenodeResolver.class); + return newInstance(conf, stateStore, StateStoreService.class, clazz); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67299a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java new file mode 100644 index 0000000..90a6699 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Record store that takes care of caching the records in memory. + * + * @param Record to store by this interface. + */ +public abstract class CachedRecordStore + extends RecordStore implements StateStoreCache { + + private static final Logger LOG = + LoggerFactory.getLogger(CachedRecordStore.class); + + + /** Prevent loading the cache more than once every 500 ms. */ + private static final long MIN_UPDATE_MS = 500; + + + /** Cached entries. */ + private List records = new ArrayList<>(); + + /** Time stamp of the cached entries. */ + private long timestamp = -1; + + /** If the cache is initialized. */ + private boolean initialized = false; + + /** Last time the cache was updated. */ + private long lastUpdate = -1; + + /** Lock to access the memory cache. */ + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); + + /** If it should override the expired values when loading the cache. */ + private boolean override = false; + + + /** + * Create a new cached record store. + * + * @param clazz Class of the record to store. + * @param driver State Store driver. + */ + protected CachedRecordStore(Class clazz, StateStoreDriver driver) { + this(clazz, driver, false); + } + + /** + * Create a new cached record store. + * + * @param clazz Class of the record to store. + * @param driver State Store driver. + * @param override If the entries should be override if they expire + */ + protected CachedRecordStore( + Class clazz, StateStoreDriver driver, boolean over) { + super(clazz, driver); + + this.override = over; + } + + /** + * Check that the cache of the State Store information is available. + * + * @throws StateStoreUnavailableException If the cache is not initialized. + */ + private void checkCacheAvailable() throws StateStoreUnavailableException { + if (!this.initialized) { + throw new StateStoreUnavailableException( + "Cached State Store not initialized, " + + getRecordClass().getSimpleName() + " records not valid"); + } + } + + @Override + public boolean loadCache(boolean force) throws IOException { + // Prevent loading the cache too frequently + if (force || isUpdateTime()) { + List newRecords = null; + long t = -1; + try { + QueryResult result = getDriver().get(getRecordClass()); + newRecords = result.getRecords(); + t = result.getTimestamp(); + + // If we have any expired record, update the State Store + if (this.override) { + overrideExpiredRecords(result); + } + } catch (IOException e) { + LOG.error("Cannot get \"{}\" records from the State Store", + getRecordClass().getSimpleName()); + this.initialized = false; + return false; + } + + // Update cache atomically + writeLock.lock(); + try { + this.records.clear(); + this.records.addAll(newRecords); + this.timestamp = t; + this.initialized = true; + } finally { + writeLock.unlock(); + } + + lastUpdate = Time.monotonicNow(); + } + return true; + } + + /** + * Check if it's time to update the cache. Update it it was never updated. + * + * @return If it's time to update this cache. + */ + private boolean isUpdateTime() { + return Time.monotonicNow() - lastUpdate > MIN_UPDATE_MS; + } + + /** + * Updates the state store with any record overrides we detected, such as an + * expired state. + * + * @param query RecordQueryResult containing the data to be inspected. + * @param clazz Type of objects contained in the query. + * @throws IOException + */ + public void overrideExpiredRecords(QueryResult query) throws IOException { + List commitRecords = new ArrayList<>(); + List newRecords = query.getRecords(); + long currentDriverTime = query.getTimestamp(); + if (newRecords == null || currentDriverTime <= 0) { + LOG.error("Cannot check overrides for record"); + return; + } + for (R record : newRecords) { + if (record.checkExpired(currentDriverTime)) { + String recordName = StateStoreUtils.getRecordName(record.getClass()); + LOG.info("Override State Store record {}: {}", recordName, record); + commitRecords.add(record); + } + } + if (commitRecords.size() > 0) { + getDriver().putAll(commitRecords, true, false); + } + } + + /** + * Updates the state store with any record overrides we detected, such as an + * expired state. + * + * @param driver State store driver for the data store. + * @param record Record record to be updated. + * @param clazz Type of data record. + * @throws IOException + */ + public void overrideExpiredRecord(R record) throws IOException { + List newRecords = Collections.singletonList(record); + long time = getDriver().getTime(); + QueryResult query = new QueryResult<>(newRecords, time); + overrideExpiredRecords(query); + } + + /** + * Get all the cached records. + * + * @return Copy of the cached records. + * @throws StateStoreUnavailableException If the State store is not available. + */ + public List getCachedRecords() throws StateStoreUnavailableException { + checkCacheAvailable(); + + List ret = new LinkedList(); + this.readLock.lock(); + try { + ret.addAll(this.records); + } finally { + this.readLock.unlock(); + } + return ret; + } + + /** + * Get all the cached records and the time stamp of the cache. + * + * @return Copy of the cached records and the time stamp. + * @throws StateStoreUnavailableException If the State store is not available. + */ + protected QueryResult getCachedRecordsAndTimeStamp() + throws StateStoreUnavailableException { + checkCacheAvailable(); + + this.readLock.lock(); + try { + return new QueryResult(this.records, this.timestamp); + } finally { + this.readLock.unlock(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67299a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java new file mode 100644 index 0000000..3e8ba6b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; + +/** + * Management API for NameNode registrations stored in + * {@link org.apache.hadoop.hdfs.server.federation.store.records.MembershipState + * MembershipState} records. The {@link org.apache.hadoop.hdfs.server. + * federation.router.RouterHeartbeatService RouterHeartbeatService} periodically + * polls each NN to update the NameNode metadata(addresses, operational) and HA + * state(active, standby). Each NameNode may be polled by multiple + * {@link org.apache.hadoop.hdfs.server.federation.router.Router Router} + * instances. + *

+ * Once fetched from the + * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver + * StateStoreDriver}, NameNode registrations are cached until the next query. + * The fetched registration data is aggregated using a quorum to determine the + * best/most accurate state for each NameNode. The cache is periodically updated + * by the @{link StateStoreCacheUpdateService}. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class MembershipStore + extends CachedRecordStore { + + protected MembershipStore(StateStoreDriver driver) { + super(MembershipState.class, driver, true); + } + + /** + * Inserts or updates a namenode membership entry into the table. + * + * @param request Fully populated NamenodeHeartbeatRequest request. + * @return True if successful, false otherwise. + * @throws StateStoreUnavailableException Throws exception if the data store + * is not initialized. + * @throws IOException if the data store could not be queried or the query is + * invalid. + */ + public abstract NamenodeHeartbeatResponse namenodeHeartbeat( + NamenodeHeartbeatRequest request) throws IOException; + + /** + * Queries for a single cached registration entry matching the given + * parameters. Possible keys are the names of data structure elements Possible + * values are matching SQL "LIKE" targets. + * + * @param request Fully populated GetNamenodeRegistrationsRequest request. + * @return Single matching FederationMembershipStateEntry or null if not found + * or more than one entry matches. + * @throws StateStoreUnavailableException Throws exception if the data store + * is not initialized. + * @throws IOException if the data store could not be queried or the query is + * invalid. + */ + public abstract GetNamenodeRegistrationsResponse getNamenodeRegistrations( + GetNamenodeRegistrationsRequest request) throws IOException; + + /** + * Get the expired registrations from the registration cache. + * + * @return Expired registrations or zero-length list if none are found. + * @throws StateStoreUnavailableException Throws exception if the data store + * is not initialized. + * @throws IOException if the data store could not be queried or the query is + * invalid. + */ + public abstract GetNamenodeRegistrationsResponse + getExpiredNamenodeRegistrations(GetNamenodeRegistrationsRequest request) + throws IOException; + + /** + * Retrieves a list of registered nameservices and their associated info. + * + * @param request + * @return Collection of information for each registered nameservice. + * @throws IOException if the data store could not be queried or the query is + * invalid. + */ + public abstract GetNamespaceInfoResponse getNamespaceInfo( + GetNamespaceInfoRequest request) throws IOException; + + /** + * Overrides a cached namenode state with an updated state. + * + * @param request Fully populated OverrideNamenodeRegistrationRequest request. + * @return OverrideNamenodeRegistrationResponse + * @throws StateStoreUnavailableException if the data store is not + * initialized. + * @throws IOException if the data store could not be queried or the query is + * invalid. + */ + public abstract UpdateNamenodeRegistrationResponse updateNamenodeRegistration( + UpdateNamenodeRegistrationRequest request) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67299a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java new file mode 100644 index 0000000..83fc501 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import java.io.IOException; + +/** + * Interface for a cached copy of the State Store. + */ +public interface StateStoreCache { + + /** + * Load the cache from the State Store. Called by the cache update service + * when the data has been reloaded. + * + * @param force If we force the load. + * @return If the cache was loaded successfully. + * @throws IOException If there was an error loading the cache. + */ + boolean loadCache(boolean force) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67299a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java new file mode 100644 index 0000000..bb8cfb0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.PeriodicService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service to periodically update the {@link StateStoreService} + * cached information in the + * {@link org.apache.hadoop.hdfs.server.federation.router.Router Router}. + * This is for performance and removes the State Store from the critical path + * in common operations. + */ +public class StateStoreCacheUpdateService extends PeriodicService { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreCacheUpdateService.class); + + /** The service that manages the State Store connection. */ + private final StateStoreService stateStore; + + + /** + * Create a new Cache update service. + * + * @param stateStore Implementation of the state store + */ + public StateStoreCacheUpdateService(StateStoreService stateStore) { + super(StateStoreCacheUpdateService.class.getSimpleName()); + this.stateStore = stateStore; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + + this.setIntervalMs(conf.getLong( + DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, + DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT)); + + super.serviceInit(conf); + } + + @Override + public void periodicInvoke() { + LOG.debug("Updating State Store cache"); + stateStore.refreshCaches(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67299a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java index df207e0..73f607f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java @@ -18,17 +18,24 @@ package org.apache.hadoop.hdfs.server.federation.store; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.LinkedList; +import java.util.List; +import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +61,9 @@ import com.google.common.annotations.VisibleForTesting; * federation. *

  • {@link MountTableStore}: Mount table between to subclusters. * See {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}. + *
  • {@link RebalancerStore}: Log of the rebalancing operations. + *
  • {@link RouterStore}: Router state in the federation. + *
  • {@link TokenStore}: Tokens in the federation. * */ @InterfaceAudience.Private @@ -77,8 +87,30 @@ public class StateStoreService extends CompositeService { private StateStoreConnectionMonitorService monitorService; + /** Supported record stores. */ + private final Map< + Class, RecordStore> + recordStores; + + /** Service to maintain State Store caches. */ + private StateStoreCacheUpdateService cacheUpdater; + /** Time the cache was last successfully updated. */ + private long cacheLastUpdateTime; + /** List of internal caches to update. */ + private final List cachesToUpdateInternal; + /** List of external caches to update. */ + private final List cachesToUpdateExternal; + + public StateStoreService() { super(StateStoreService.class.getName()); + + // Records and stores supported by this implementation + this.recordStores = new HashMap<>(); + + // Caches to maintain + this.cachesToUpdateInternal = new ArrayList<>(); + this.cachesToUpdateExternal = new ArrayList<>(); } /** @@ -102,10 +134,22 @@ public class StateStoreService extends CompositeService { throw new IOException("Cannot create driver for the State Store"); } + // Add supported record stores + addRecordStore(MembershipStoreImpl.class); + // Check the connection to the State Store periodically this.monitorService = new StateStoreConnectionMonitorService(this); this.addService(monitorService); + // Set expirations intervals for each record + MembershipState.setExpirationMs(conf.getLong( + DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, + DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT)); + + // Cache update service + this.cacheUpdater = new StateStoreCacheUpdateService(this); + addService(this.cacheUpdater); + super.serviceInit(this.conf); } @@ -123,13 +167,56 @@ public class StateStoreService extends CompositeService { } /** + * Add a record store to the State Store. It includes adding the store, the + * supported record and the cache management. + * + * @param clazz Class of the record store to track. + * @return New record store. + * @throws ReflectiveOperationException + */ + private > void addRecordStore( + final Class clazz) throws ReflectiveOperationException { + + assert this.getServiceState() == STATE.INITED : + "Cannot add record to the State Store once started"; + + T recordStore = RecordStore.newInstance(clazz, this.getDriver()); + Class recordClass = recordStore.getRecordClass(); + this.recordStores.put(recordClass, recordStore); + + // Subscribe for cache updates + if (recordStore instanceof StateStoreCache) { + StateStoreCache cachedRecordStore = (StateStoreCache) recordStore; + this.cachesToUpdateInternal.add(cachedRecordStore); + } + } + + /** + * Get the record store in this State Store for a given interface. + * + * @param recordStoreClass Class of the record store. + * @return Registered record store or null if not found. + */ + public > T getRegisteredRecordStore( + final Class recordStoreClass) { + for (RecordStore recordStore : + this.recordStores.values()) { + if (recordStoreClass.isInstance(recordStore)) { + @SuppressWarnings("unchecked") + T recordStoreChecked = (T) recordStore; + return recordStoreChecked; + } + } + return null; + } + + /** * List of records supported by this State Store. * * @return List of supported record classes. */ public Collection> getSupportedRecords() { - // TODO add list of records - return new LinkedList<>(); + return this.recordStores.keySet(); } /** @@ -142,6 +229,7 @@ public class StateStoreService extends CompositeService { if (this.driver.init(conf, getIdentifier(), getSupportedRecords())) { LOG.info("Connection to the State Store driver {} is open and ready", driverName); + this.refreshCaches(); } else { LOG.error("Cannot initialize State Store driver {}", driverName); } @@ -198,4 +286,114 @@ public class StateStoreService extends CompositeService { this.identifier = id; } + // + // Cached state store data + // + /** + * The last time the state store cache was fully updated. + * + * @return Timestamp. + */ + public long getCacheUpdateTime() { + return this.cacheLastUpdateTime; + } + + /** + * Stops the cache update service. + */ + @VisibleForTesting + public void stopCacheUpdateService() { + if (this.cacheUpdater != null) { + this.cacheUpdater.stop(); + removeService(this.cacheUpdater); + this.cacheUpdater = null; + } + } + + /** + * Register a cached record store for automatic periodic cache updates. + * + * @param client Client to the state store. + */ + public void registerCacheExternal(StateStoreCache client) { + this.cachesToUpdateExternal.add(client); + } + + /** + * Refresh the cache with information from the State Store. Called + * periodically by the CacheUpdateService to maintain data caches and + * versions. + */ + public void refreshCaches() { + refreshCaches(false); + } + + /** + * Refresh the cache with information from the State Store. Called + * periodically by the CacheUpdateService to maintain data caches and + * versions. + * @param force If we force the refresh. + */ + public void refreshCaches(boolean force) { + boolean success = true; + if (isDriverReady()) { + List cachesToUpdate = new LinkedList<>(); + cachesToUpdate.addAll(cachesToUpdateInternal); + cachesToUpdate.addAll(cachesToUpdateExternal); + for (StateStoreCache cachedStore : cachesToUpdate) { + String cacheName = cachedStore.getClass().getSimpleName(); + boolean result = false; + try { + result = cachedStore.loadCache(force); + } catch (IOException e) { + LOG.error("Error updating cache for {}", cacheName, e); + result = false; + } + if (!result) { + success = false; + LOG.error("Cache update failed for cache {}", cacheName); + } + } + } else { + success = false; + LOG.info("Skipping State Store cache update, driver is not ready."); + } + if (success) { + // Uses local time, not driver time. + this.cacheLastUpdateTime = Time.now(); + } + } + + /** + * Update the cache for a specific record store. + * + * @param clazz Class of the record store. + * @return If the cached was loaded. + * @throws IOException if the cache update failed. + */ + public boolean loadCache(final Class clazz) throws IOException { + return loadCache(clazz, false); + } + + /** + * Update the cache for a specific record store. + * + * @param clazz Class of the record store. + * @param force Force the update ignoring cached periods. + * @return If the cached was loaded. + * @throws IOException if the cache update failed. + */ + public boolean loadCache(Class clazz, boolean force) throws IOException { + List cachesToUpdate = + new LinkedList(); + cachesToUpdate.addAll(this.cachesToUpdateInternal); + cachesToUpdate.addAll(this.cachesToUpdateExternal); + for (StateStoreCache cachedStore : cachesToUpdate) { + if (clazz.isInstance(cachedStore)) { + return cachedStore.loadCache(force); + } + } + throw new IOException("Registered cache was not found for " + clazz); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67299a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java new file mode 100644 index 0000000..c28131f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.impl; + +import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of the {@link MembershipStore} State Store API. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MembershipStoreImpl + extends MembershipStore implements StateStoreCache { + + private static final Logger LOG = + LoggerFactory.getLogger(MembershipStoreImpl.class); + + + /** Reported namespaces that are not decommissioned. */ + private final Set activeNamespaces; + + /** Namenodes (after evaluating the quorum) that are active in the cluster. */ + private final Map activeRegistrations; + /** Namenode status reports (raw) that were discarded for being too old. */ + private final Map expiredRegistrations; + + /** Lock to access the local memory cache. */ + private final ReadWriteLock cacheReadWriteLock = + new ReentrantReadWriteLock(); + private final Lock cacheReadLock = cacheReadWriteLock.readLock(); + private final Lock cacheWriteLock = cacheReadWriteLock.writeLock(); + + + public MembershipStoreImpl(StateStoreDriver driver) { + super(driver); + + this.activeRegistrations = new HashMap<>(); + this.expiredRegistrations = new HashMap<>(); + this.activeNamespaces = new TreeSet<>(); + } + + @Override + public GetNamenodeRegistrationsResponse getExpiredNamenodeRegistrations( + GetNamenodeRegistrationsRequest request) throws IOException { + + GetNamenodeRegistrationsResponse response = + GetNamenodeRegistrationsResponse.newInstance(); + cacheReadLock.lock(); + try { + Collection vals = this.expiredRegistrations.values(); + List copyVals = new ArrayList<>(vals); + response.setNamenodeMemberships(copyVals); + } finally { + cacheReadLock.unlock(); + } + return response; + } + + @Override + public GetNamespaceInfoResponse getNamespaceInfo( + GetNamespaceInfoRequest request) throws IOException { + + Set namespaces = new HashSet<>(); + try { + cacheReadLock.lock(); + namespaces.addAll(activeNamespaces); + } finally { + cacheReadLock.unlock(); + } + + GetNamespaceInfoResponse response = + GetNamespaceInfoResponse.newInstance(namespaces); + return response; + } + + @Override + public GetNamenodeRegistrationsResponse getNamenodeRegistrations( + final GetNamenodeRegistrationsRequest request) throws IOException { + + // TODO Cache some common queries and sorts + List ret = null; + + cacheReadLock.lock(); + try { + Collection registrations = activeRegistrations.values(); + MembershipState partialMembership = request.getPartialMembership(); + if (partialMembership == null) { + ret = new ArrayList<>(registrations); + } else { + Query query = new Query<>(partialMembership); + ret = filterMultiple(query, registrations); + } + } finally { + cacheReadLock.unlock(); + } + // Sort in ascending update date order + Collections.sort(ret); + + GetNamenodeRegistrationsResponse response = + GetNamenodeRegistrationsResponse.newInstance(ret); + return response; + } + + @Override + public NamenodeHeartbeatResponse namenodeHeartbeat( + NamenodeHeartbeatRequest request) throws IOException { + + MembershipState record = request.getNamenodeMembership(); + String nnId = record.getNamenodeKey(); + MembershipState existingEntry = null; + cacheReadLock.lock(); + try { + existingEntry = this.activeRegistrations.get(nnId); + } finally { + cacheReadLock.unlock(); + } + + if (existingEntry != null) { + if (existingEntry.getState() != record.getState()) { + LOG.info("NN registration state has changed: {} -> {}", + existingEntry, record); + } else { + LOG.debug("Updating NN registration: {} -> {}", existingEntry, record); + } + } else { + LOG.info("Inserting new NN registration: {}", record); + } + + boolean status = getDriver().put(record, true, false); + + NamenodeHeartbeatResponse response = + NamenodeHeartbeatResponse.newInstance(status); + return response; + } + + @Override + public boolean loadCache(boolean force) throws IOException { + super.loadCache(force); + + // Update local cache atomically + cacheWriteLock.lock(); + try { + this.activeRegistrations.clear(); + this.expiredRegistrations.clear(); + this.activeNamespaces.clear(); + + // Build list of NN registrations: nnId -> registration list + Map> nnRegistrations = new HashMap<>(); + List cachedRecords = getCachedRecords(); + for (MembershipState membership : cachedRecords) { + String nnId = membership.getNamenodeKey(); + if (membership.getState() == FederationNamenodeServiceState.EXPIRED) { + // Expired, RPC service does not use these + String key = membership.getPrimaryKey(); + this.expiredRegistrations.put(key, membership); + } else { + // This is a valid NN registration, build a list of all registrations + // using the NN id to use for the quorum calculation. + List nnRegistration = + nnRegistrations.get(nnId); + if (nnRegistration == null) { + nnRegistration = new LinkedList<>(); + nnRegistrations.put(nnId, nnRegistration); + } + nnRegistration.add(membership); + String bpId = membership.getBlockPoolId(); + String cId = membership.getClusterId(); + String nsId = membership.getNameserviceId(); + FederationNamespaceInfo nsInfo = + new FederationNamespaceInfo(bpId, cId, nsId); + this.activeNamespaces.add(nsInfo); + } + } + + // Calculate most representative entry for each active NN id + for (List nnRegistration : nnRegistrations.values()) { + // Run quorum based on NN state + MembershipState representativeRecord = + getRepresentativeQuorum(nnRegistration); + String nnKey = representativeRecord.getNamenodeKey(); + this.activeRegistrations.put(nnKey, representativeRecord); + } + LOG.debug("Refreshed {} NN registrations from State Store", + cachedRecords.size()); + } finally { + cacheWriteLock.unlock(); + } + return true; + } + + @Override + public UpdateNamenodeRegistrationResponse updateNamenodeRegistration( + UpdateNamenodeRegistrationRequest request) throws IOException { + + boolean status = false; + cacheWriteLock.lock(); + try { + String namenode = MembershipState.getNamenodeKey( + request.getNameserviceId(), request.getNamenodeId()); + MembershipState member = this.activeRegistrations.get(namenode); + if (member != null) { + member.setState(request.getState()); + status = true; + } + } finally { + cacheWriteLock.unlock(); + } + UpdateNamenodeRegistrationResponse response = + UpdateNamenodeRegistrationResponse.newInstance(status); + return response; + } + + /** + * Picks the most recent entry in the subset that is most agreeable on the + * specified field. 1) If a majority of the collection has the same value for + * the field, the first sorted entry within the subset the matches the + * majority value 2) Otherwise the first sorted entry in the set of all + * entries + * + * @param entries - Collection of state store record objects of the same type + * @param fieldName - Field name for the value to compare + * @return record that is most representative of the field name + */ + private MembershipState getRepresentativeQuorum( + Collection records) { + + // Collate objects by field value: field value -> order set of records + Map> occurenceMap = + new HashMap<>(); + for (MembershipState record : records) { + FederationNamenodeServiceState state = record.getState(); + TreeSet matchingSet = occurenceMap.get(state); + if (matchingSet == null) { + // TreeSet orders elements by descending date via comparators + matchingSet = new TreeSet<>(); + occurenceMap.put(state, matchingSet); + } + matchingSet.add(record); + } + + // Select largest group + TreeSet largestSet = new TreeSet<>(); + for (TreeSet matchingSet : occurenceMap.values()) { + if (largestSet.size() < matchingSet.size()) { + largestSet = matchingSet; + } + } + + // If quorum, use the newest element here + if (largestSet.size() > records.size() / 2) { + return largestSet.first(); + // Otherwise, return most recent by class comparator + } else if (records.size() > 0) { + TreeSet sortedList = new TreeSet<>(records); + LOG.debug("Quorum failed, using most recent: {}", sortedList.first()); + return sortedList.first(); + } else { + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67299a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java new file mode 100644 index 0000000..1a50d15 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains implementations of the state store API interfaces. All classes + * derive from {@link + * org.apache.hadoop.hdfs.server.federation.store.RecordStore}. The API + * definitions are contained in the + * org.apache.hadoop.hdfs.server.federation.store package. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +package org.apache.hadoop.hdfs.server.federation.store.impl; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67299a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsRequest.java new file mode 100644 index 0000000..568feaf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsRequest.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; + +/** + * API request for listing namenode registrations present in the state store. + */ +public abstract class GetNamenodeRegistrationsRequest { + + public static GetNamenodeRegistrationsRequest newInstance() + throws IOException { + return StateStoreSerializer.newRecord( + GetNamenodeRegistrationsRequest.class); + } + + public static GetNamenodeRegistrationsRequest newInstance( + MembershipState member) throws IOException { + GetNamenodeRegistrationsRequest request = newInstance(); + request.setPartialMembership(member); + return request; + } + + @Public + @Unstable + public abstract MembershipState getPartialMembership(); + + @Public + @Unstable + public abstract void setPartialMembership(MembershipState member); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67299a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsResponse.java new file mode 100644 index 0000000..0d60c90 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsResponse.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; + +/** + * API response for listing namenode registrations present in the state store. + */ +public abstract class GetNamenodeRegistrationsResponse { + + public static GetNamenodeRegistrationsResponse newInstance() + throws IOException { + return StateStoreSerializer.newRecord( + GetNamenodeRegistrationsResponse.class); + } + + public static GetNamenodeRegistrationsResponse newInstance( + List records) throws IOException { + GetNamenodeRegistrationsResponse response = newInstance(); + response.setNamenodeMemberships(records); + return response; + } + + @Public + @Unstable + public abstract List getNamenodeMemberships() + throws IOException; + + @Public + @Unstable + public abstract void setNamenodeMemberships( + List records) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67299a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoRequest.java new file mode 100644 index 0000000..b5cc01b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoRequest.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; + +/** + * API response for listing HDFS namespaces present in the state store. + */ +public abstract class GetNamespaceInfoRequest { + + public static GetNamespaceInfoRequest newInstance() { + return StateStoreSerializer.newRecord(GetNamespaceInfoRequest.class); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67299a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoResponse.java new file mode 100644 index 0000000..f541453 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoResponse.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; + +/** + * API response for listing HDFS namespaces present in the state store. + */ +public abstract class GetNamespaceInfoResponse { + + public static GetNamespaceInfoResponse newInstance() { + return StateStoreSerializer.newRecord(GetNamespaceInfoResponse.class); + } + + public static GetNamespaceInfoResponse newInstance( + Set namespaces) throws IOException { + GetNamespaceInfoResponse response = newInstance(); + response.setNamespaceInfo(namespaces); + return response; + } + + @Public + @Unstable + public abstract Set getNamespaceInfo(); + + @Public + @Unstable + public abstract void setNamespaceInfo( + Set namespaceInfo); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67299a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatRequest.java new file mode 100644 index 0000000..9506026 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatRequest.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; + +/** + * API request for registering a namenode with the state store. + */ +public abstract class NamenodeHeartbeatRequest { + + public static NamenodeHeartbeatRequest newInstance() throws IOException { + return StateStoreSerializer.newRecord(NamenodeHeartbeatRequest.class); + } + + public static NamenodeHeartbeatRequest newInstance(MembershipState namenode) + throws IOException { + NamenodeHeartbeatRequest request = newInstance(); + request.setNamenodeMembership(namenode); + return request; + } + + @Private + @Unstable + public abstract MembershipState getNamenodeMembership() + throws IOException; + + @Private + @Unstable + public abstract void setNamenodeMembership(MembershipState report) + throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67299a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatResponse.java new file mode 100644 index 0000000..acb7a6f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatResponse.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; + +/** + * API response for registering a namenode with the state store. + */ +public abstract class NamenodeHeartbeatResponse { + + public static NamenodeHeartbeatResponse newInstance() throws IOException { + return StateStoreSerializer.newRecord(NamenodeHeartbeatResponse.class); + } + + public static NamenodeHeartbeatResponse newInstance(boolean status) + throws IOException { + NamenodeHeartbeatResponse response = newInstance(); + response.setResult(status); + return response; + } + + @Private + @Unstable + public abstract boolean getResult(); + + @Private + @Unstable + public abstract void setResult(boolean result); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67299a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateNamenodeRegistrationRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateNamenodeRegistrationRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateNamenodeRegistrationRequest.java new file mode 100644 index 0000000..4459e33 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateNamenodeRegistrationRequest.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; + +/** + * API request for overriding an existing namenode registration in the state + * store. + */ +public abstract class UpdateNamenodeRegistrationRequest { + + public static UpdateNamenodeRegistrationRequest newInstance() + throws IOException { + return StateStoreSerializer.newRecord( + UpdateNamenodeRegistrationRequest.class); + } + + public static UpdateNamenodeRegistrationRequest newInstance( + String nameserviceId, String namenodeId, + FederationNamenodeServiceState state) throws IOException { + UpdateNamenodeRegistrationRequest request = newInstance(); + request.setNameserviceId(nameserviceId); + request.setNamenodeId(namenodeId); + request.setState(state); + return request; + } + + @Private + @Unstable + public abstract String getNameserviceId(); + + @Private + @Unstable + public abstract String getNamenodeId(); + + @Private + @Unstable + public abstract FederationNamenodeServiceState getState(); + + @Private + @Unstable + public abstract void setNameserviceId(String nsId); + + @Private + @Unstable + public abstract void setNamenodeId(String nnId); + + @Private + @Unstable + public abstract void setState(FederationNamenodeServiceState state); +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org