From common-commits-return-79962-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Tue Mar 20 06:18:35 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 59B2B18077A for ; Tue, 20 Mar 2018 06:18:33 +0100 (CET) Received: (qmail 14455 invoked by uid 500); 20 Mar 2018 05:18:29 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 12146 invoked by uid 99); 20 Mar 2018 05:18:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Mar 2018 05:18:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6ECA5F66FE; Tue, 20 Mar 2018 05:18:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: weiy@apache.org To: common-commits@hadoop.apache.org Date: Tue, 20 Mar 2018 05:18:42 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/39] hadoop git commit: HDFS-13215. RBF: Move Router to its own module. Contributed by Wei Yan http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java deleted file mode 100644 index aaf2817..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java +++ /dev/null @@ -1,362 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.URI; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ha.HAServiceProtocol; -import org.apache.hadoop.ha.HAServiceStatus; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.NameNodeProxies; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; -import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; -import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; -import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; -import org.apache.hadoop.hdfs.tools.NNHAServiceTarget; -import org.codehaus.jettison.json.JSONArray; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The {@link Router} periodically checks the state of a Namenode (usually on - * the same server) and reports their high availability (HA) state and - * load/space status to the - * {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService} - * . Note that this is an optional role as a Router can be independent of any - * subcluster. - *

- * For performance with Namenode HA, the Router uses the high availability state - * information in the State Store to forward the request to the Namenode that is - * most likely to be active. - *

- * Note that this service can be embedded into the Namenode itself to simplify - * the operation. - */ -public class NamenodeHeartbeatService extends PeriodicService { - - private static final Logger LOG = - LoggerFactory.getLogger(NamenodeHeartbeatService.class); - - - /** Configuration for the heartbeat. */ - private Configuration conf; - - /** Router performing the heartbeating. */ - private final ActiveNamenodeResolver resolver; - - /** Interface to the tracked NN. */ - private final String nameserviceId; - private final String namenodeId; - - /** Namenode HA target. */ - private NNHAServiceTarget localTarget; - /** RPC address for the namenode. */ - private String rpcAddress; - /** Service RPC address for the namenode. */ - private String serviceAddress; - /** Service RPC address for the namenode. */ - private String lifelineAddress; - /** HTTP address for the namenode. */ - private String webAddress; - - /** - * Create a new Namenode status updater. - * @param resolver Namenode resolver service to handle NN registration. - * @param nsId Identifier of the nameservice. - * @param nnId Identifier of the namenode in HA. - */ - public NamenodeHeartbeatService( - ActiveNamenodeResolver resolver, String nsId, String nnId) { - super(NamenodeHeartbeatService.class.getSimpleName() + - (nsId == null ? "" : " " + nsId) + - (nnId == null ? "" : " " + nnId)); - - this.resolver = resolver; - - this.nameserviceId = nsId; - this.namenodeId = nnId; - - } - - @Override - protected void serviceInit(Configuration configuration) throws Exception { - - this.conf = configuration; - - String nnDesc = nameserviceId; - if (this.namenodeId != null && !this.namenodeId.isEmpty()) { - this.localTarget = new NNHAServiceTarget( - conf, nameserviceId, namenodeId); - nnDesc += "-" + namenodeId; - } else { - this.localTarget = null; - } - - // Get the RPC address for the clients to connect - this.rpcAddress = getRpcAddress(conf, nameserviceId, namenodeId); - LOG.info("{} RPC address: {}", nnDesc, rpcAddress); - - // Get the Service RPC address for monitoring - this.serviceAddress = - DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, namenodeId); - if (this.serviceAddress == null) { - LOG.error("Cannot locate RPC service address for NN {}, " + - "using RPC address {}", nnDesc, this.rpcAddress); - this.serviceAddress = this.rpcAddress; - } - LOG.info("{} Service RPC address: {}", nnDesc, serviceAddress); - - // Get the Lifeline RPC address for faster monitoring - this.lifelineAddress = - DFSUtil.getNamenodeLifelineAddr(conf, nameserviceId, namenodeId); - if (this.lifelineAddress == null) { - this.lifelineAddress = this.serviceAddress; - } - LOG.info("{} Lifeline RPC address: {}", nnDesc, lifelineAddress); - - // Get the Web address for UI - this.webAddress = - DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId); - LOG.info("{} Web address: {}", nnDesc, webAddress); - - this.setIntervalMs(conf.getLong( - DFS_ROUTER_HEARTBEAT_INTERVAL_MS, - DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT)); - - - super.serviceInit(configuration); - } - - @Override - public void periodicInvoke() { - updateState(); - } - - /** - * Get the RPC address for a Namenode. - * @param conf Configuration. - * @param nsId Name service identifier. - * @param nnId Name node identifier. - * @return RPC address in format hostname:1234. - */ - private static String getRpcAddress( - Configuration conf, String nsId, String nnId) { - - // Get it from the regular RPC setting - String confKey = DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; - String ret = conf.get(confKey); - - if (nsId != null || nnId != null) { - // Get if for the proper nameservice and namenode - confKey = DFSUtil.addKeySuffixes(confKey, nsId, nnId); - ret = conf.get(confKey); - - // If not available, get it from the map - if (ret == null) { - Map rpcAddresses = - DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null); - InetSocketAddress sockAddr = null; - if (nnId != null) { - sockAddr = rpcAddresses.get(nnId); - } else if (rpcAddresses.size() == 1) { - // Get the only namenode in the namespace - sockAddr = rpcAddresses.values().iterator().next(); - } - if (sockAddr != null) { - InetAddress addr = sockAddr.getAddress(); - ret = addr.getHostName() + ":" + sockAddr.getPort(); - } - } - } - return ret; - } - - /** - * Update the state of the Namenode. - */ - private void updateState() { - NamenodeStatusReport report = getNamenodeStatusReport(); - if (!report.registrationValid()) { - // Not operational - LOG.error("Namenode is not operational: {}", getNamenodeDesc()); - } else if (report.haStateValid()) { - // block and HA status available - LOG.debug("Received service state: {} from HA namenode: {}", - report.getState(), getNamenodeDesc()); - } else if (localTarget == null) { - // block info available, HA status not expected - LOG.debug( - "Reporting non-HA namenode as operational: " + getNamenodeDesc()); - } else { - // block info available, HA status should be available, but was not - // fetched do nothing and let the current state stand - return; - } - try { - if (!resolver.registerNamenode(report)) { - LOG.warn("Cannot register namenode {}", report); - } - } catch (IOException e) { - LOG.info("Cannot register namenode in the State Store"); - } catch (Exception ex) { - LOG.error("Unhandled exception updating NN registration for {}", - getNamenodeDesc(), ex); - } - } - - /** - * Get the status report for the Namenode monitored by this heartbeater. - * @return Namenode status report. - */ - protected NamenodeStatusReport getNamenodeStatusReport() { - NamenodeStatusReport report = new NamenodeStatusReport(nameserviceId, - namenodeId, rpcAddress, serviceAddress, lifelineAddress, webAddress); - - try { - LOG.debug("Probing NN at service address: {}", serviceAddress); - - URI serviceURI = new URI("hdfs://" + serviceAddress); - // Read the filesystem info from RPC (required) - NamenodeProtocol nn = NameNodeProxies - .createProxy(this.conf, serviceURI, NamenodeProtocol.class) - .getProxy(); - - if (nn != null) { - NamespaceInfo info = nn.versionRequest(); - if (info != null) { - report.setNamespaceInfo(info); - } - } - if (!report.registrationValid()) { - return report; - } - - // Check for safemode from the client protocol. Currently optional, but - // should be required at some point for QoS - try { - ClientProtocol client = NameNodeProxies - .createProxy(this.conf, serviceURI, ClientProtocol.class) - .getProxy(); - if (client != null) { - boolean isSafeMode = client.setSafeMode( - SafeModeAction.SAFEMODE_GET, false); - report.setSafeMode(isSafeMode); - } - } catch (Exception e) { - LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e); - } - - // Read the stats from JMX (optional) - updateJMXParameters(webAddress, report); - - if (localTarget != null) { - // Try to get the HA status - try { - // Determine if NN is active - // TODO: dynamic timeout - HAServiceProtocol haProtocol = localTarget.getProxy(conf, 30*1000); - HAServiceStatus status = haProtocol.getServiceStatus(); - report.setHAServiceState(status.getState()); - } catch (Throwable e) { - if (e.getMessage().startsWith("HA for namenode is not enabled")) { - LOG.error("HA for {} is not enabled", getNamenodeDesc()); - localTarget = null; - } else { - // Failed to fetch HA status, ignoring failure - LOG.error("Cannot fetch HA status for {}: {}", - getNamenodeDesc(), e.getMessage(), e); - } - } - } - } catch(IOException e) { - LOG.error("Cannot communicate with {}: {}", - getNamenodeDesc(), e.getMessage()); - } catch(Throwable e) { - // Generic error that we don't know about - LOG.error("Unexpected exception while communicating with {}: {}", - getNamenodeDesc(), e.getMessage(), e); - } - return report; - } - - /** - * Get the description of the Namenode to monitor. - * @return Description of the Namenode to monitor. - */ - public String getNamenodeDesc() { - if (namenodeId != null && !namenodeId.isEmpty()) { - return nameserviceId + "-" + namenodeId + ":" + serviceAddress; - } else { - return nameserviceId + ":" + serviceAddress; - } - } - - /** - * Get the parameters for a Namenode from JMX and add them to the report. - * @param address Web interface of the Namenode to monitor. - * @param report Namenode status report to update with JMX data. - */ - private void updateJMXParameters( - String address, NamenodeStatusReport report) { - try { - // TODO part of this should be moved to its own utility - String query = "Hadoop:service=NameNode,name=FSNamesystem*"; - JSONArray aux = FederationUtil.getJmx(query, address); - if (aux != null) { - for (int i = 0; i < aux.length(); i++) { - JSONObject jsonObject = aux.getJSONObject(i); - String name = jsonObject.getString("name"); - if (name.equals("Hadoop:service=NameNode,name=FSNamesystemState")) { - report.setDatanodeInfo( - jsonObject.getInt("NumLiveDataNodes"), - jsonObject.getInt("NumDeadDataNodes"), - jsonObject.getInt("NumDecommissioningDataNodes"), - jsonObject.getInt("NumDecomLiveDataNodes"), - jsonObject.getInt("NumDecomDeadDataNodes")); - } else if (name.equals( - "Hadoop:service=NameNode,name=FSNamesystem")) { - report.setNamesystemInfo( - jsonObject.getLong("CapacityRemaining"), - jsonObject.getLong("CapacityTotal"), - jsonObject.getLong("FilesTotal"), - jsonObject.getLong("BlocksTotal"), - jsonObject.getLong("MissingBlocks"), - jsonObject.getLong("PendingReplicationBlocks"), - jsonObject.getLong("UnderReplicatedBlocks"), - jsonObject.getLong("PendingDeletionBlocks"), - jsonObject.getLong("ProvidedCapacityTotal")); - } - } - } - } catch (Exception e) { - LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java deleted file mode 100644 index 5e12222..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java +++ /dev/null @@ -1,198 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.service.ServiceStateException; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -/** - * Service to periodically execute a runnable. - */ -public abstract class PeriodicService extends AbstractService { - - private static final Logger LOG = - LoggerFactory.getLogger(PeriodicService.class); - - /** Default interval in milliseconds for the periodic service. */ - private static final long DEFAULT_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); - - - /** Interval for running the periodic service in milliseconds. */ - private long intervalMs; - /** Name of the service. */ - private final String serviceName; - - /** Scheduler for the periodic service. */ - private final ScheduledExecutorService scheduler; - - /** If the service is running. */ - private volatile boolean isRunning = false; - - /** How many times we run. */ - private long runCount; - /** How many errors we got. */ - private long errorCount; - /** When was the last time we executed this service successfully. */ - private long lastRun; - - /** - * Create a new periodic update service. - * - * @param name Name of the service. - */ - public PeriodicService(String name) { - this(name, DEFAULT_INTERVAL_MS); - } - - /** - * Create a new periodic update service. - * - * @param name Name of the service. - * @param interval Interval for the periodic service in milliseconds. - */ - public PeriodicService(String name, long interval) { - super(name); - this.serviceName = name; - this.intervalMs = interval; - - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat(this.getName() + "-%d") - .build(); - this.scheduler = Executors.newScheduledThreadPool(1, threadFactory); - } - - /** - * Set the interval for the periodic service. - * - * @param interval Interval in milliseconds. - */ - protected void setIntervalMs(long interval) { - if (getServiceState() == STATE.STARTED) { - throw new ServiceStateException("Periodic service already started"); - } else { - this.intervalMs = interval; - } - } - - /** - * Get the interval for the periodic service. - * - * @return Interval in milliseconds. - */ - protected long getIntervalMs() { - return this.intervalMs; - } - - /** - * Get how many times we failed to run the periodic service. - * - * @return Times we failed to run the periodic service. - */ - protected long getErrorCount() { - return this.errorCount; - } - - /** - * Get how many times we run the periodic service. - * - * @return Times we run the periodic service. - */ - protected long getRunCount() { - return this.runCount; - } - - /** - * Get the last time the periodic service was executed. - * - * @return Last time the periodic service was executed. - */ - protected long getLastUpdate() { - return this.lastRun; - } - - @Override - protected void serviceStart() throws Exception { - super.serviceStart(); - LOG.info("Starting periodic service {}", this.serviceName); - startPeriodic(); - } - - @Override - protected void serviceStop() throws Exception { - stopPeriodic(); - LOG.info("Stopping periodic service {}", this.serviceName); - super.serviceStop(); - } - - /** - * Stop the periodic task. - */ - protected synchronized void stopPeriodic() { - if (this.isRunning) { - LOG.info("{} is shutting down", this.serviceName); - this.isRunning = false; - this.scheduler.shutdownNow(); - } - } - - /** - * Start the periodic execution. - */ - protected synchronized void startPeriodic() { - stopPeriodic(); - - // Create the runnable service - Runnable updateRunnable = new Runnable() { - @Override - public void run() { - LOG.debug("Running {} update task", serviceName); - try { - if (!isRunning) { - return; - } - periodicInvoke(); - runCount++; - lastRun = Time.now(); - } catch (Exception ex) { - errorCount++; - LOG.warn(serviceName + " service threw an exception", ex); - } - } - }; - - // Start the execution of the periodic service - this.isRunning = true; - this.scheduler.scheduleWithFixedDelay( - updateRunnable, 0, this.intervalMs, TimeUnit.MILLISECONDS); - } - - /** - * Method that the service will run periodically. - */ - protected abstract void periodicInvoke(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java deleted file mode 100644 index dbb6ffa..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java +++ /dev/null @@ -1,208 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.fs.QuotaUsage; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; -import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Module that implements the quota relevant RPC calls - * {@link ClientProtocol#setQuota(String, long, long, StorageType)} - * and - * {@link ClientProtocol#getQuotaUsage(String)} - * in the {@link RouterRpcServer}. - */ -public class Quota { - private static final Logger LOG = LoggerFactory.getLogger(Quota.class); - - /** RPC server to receive client calls. */ - private final RouterRpcServer rpcServer; - /** RPC clients to connect to the Namenodes. */ - private final RouterRpcClient rpcClient; - /** Router used in RouterRpcServer. */ - private final Router router; - - public Quota(Router router, RouterRpcServer server) { - this.router = router; - this.rpcServer = server; - this.rpcClient = server.getRPCClient(); - } - - /** - * Set quota for the federation path. - * @param path Federation path. - * @param namespaceQuota Name space quota. - * @param storagespaceQuota Storage space quota. - * @param type StorageType that the space quota is intended to be set on. - * @throws IOException - */ - public void setQuota(String path, long namespaceQuota, - long storagespaceQuota, StorageType type) throws IOException { - rpcServer.checkOperation(OperationCategory.WRITE); - - // Set quota for current path and its children mount table path. - final List locations = getQuotaRemoteLocations(path); - if (LOG.isDebugEnabled()) { - for (RemoteLocation loc : locations) { - LOG.debug("Set quota for path: nsId: {}, dest: {}.", - loc.getNameserviceId(), loc.getDest()); - } - } - - RemoteMethod method = new RemoteMethod("setQuota", - new Class[] {String.class, long.class, long.class, - StorageType.class}, - new RemoteParam(), namespaceQuota, storagespaceQuota, type); - rpcClient.invokeConcurrent(locations, method, false, false); - } - - /** - * Get quota usage for the federation path. - * @param path Federation path. - * @return Aggregated quota. - * @throws IOException - */ - public QuotaUsage getQuotaUsage(String path) throws IOException { - final List quotaLocs = getValidQuotaLocations(path); - RemoteMethod method = new RemoteMethod("getQuotaUsage", - new Class[] {String.class}, new RemoteParam()); - Map results = rpcClient.invokeConcurrent( - quotaLocs, method, true, false, QuotaUsage.class); - - return aggregateQuota(results); - } - - /** - * Get valid quota remote locations used in {@link #getQuotaUsage(String)}. - * Differentiate the method {@link #getQuotaRemoteLocations(String)}, this - * method will do some additional filtering. - * @param path Federation path. - * @return List of valid quota remote locations. - * @throws IOException - */ - private List getValidQuotaLocations(String path) - throws IOException { - final List locations = getQuotaRemoteLocations(path); - - // NameService -> Locations - Map> validLocations = new HashMap<>(); - for (RemoteLocation loc : locations) { - String nsId = loc.getNameserviceId(); - List dests = validLocations.get(nsId); - if (dests == null) { - dests = new LinkedList<>(); - dests.add(loc); - validLocations.put(nsId, dests); - } else { - // Ensure the paths in the same nameservice is different. - // Don't include parent-child paths. - boolean isChildPath = false; - for (RemoteLocation d : dests) { - if (loc.getDest().startsWith(d.getDest())) { - isChildPath = true; - break; - } - } - - if (!isChildPath) { - dests.add(loc); - } - } - } - - List quotaLocs = new LinkedList<>(); - for (List locs : validLocations.values()) { - quotaLocs.addAll(locs); - } - - return quotaLocs; - } - - /** - * Aggregate quota that queried from sub-clusters. - * @param results Quota query result. - * @return Aggregated Quota. - */ - private QuotaUsage aggregateQuota(Map results) { - long nsCount = 0; - long ssCount = 0; - boolean hasQuotaUnSet = false; - - for (Map.Entry entry : results.entrySet()) { - RemoteLocation loc = entry.getKey(); - QuotaUsage usage = entry.getValue(); - if (usage != null) { - // If quota is not set in real FileSystem, the usage - // value will return -1. - if (usage.getQuota() == -1 && usage.getSpaceQuota() == -1) { - hasQuotaUnSet = true; - } - - nsCount += usage.getFileAndDirectoryCount(); - ssCount += usage.getSpaceConsumed(); - LOG.debug( - "Get quota usage for path: nsId: {}, dest: {}," - + " nsCount: {}, ssCount: {}.", - loc.getNameserviceId(), loc.getDest(), - usage.getFileAndDirectoryCount(), usage.getSpaceConsumed()); - } - } - - QuotaUsage.Builder builder = new QuotaUsage.Builder() - .fileAndDirectoryCount(nsCount).spaceConsumed(ssCount); - if (hasQuotaUnSet) { - builder.quota(HdfsConstants.QUOTA_DONT_SET); - } - - return builder.build(); - } - - /** - * Get all quota remote locations across subclusters under given - * federation path. - * @param path Federation path. - * @return List of quota remote locations. - * @throws IOException - */ - private List getQuotaRemoteLocations(String path) - throws IOException { - List locations = new LinkedList<>(); - RouterQuotaManager manager = this.router.getQuotaManager(); - if (manager != null) { - Set childrenPaths = manager.getPaths(path); - for (String childPath : childrenPaths) { - locations.addAll(rpcServer.getLocationsForPath(childPath, true)); - } - } - - return locations; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java deleted file mode 100644 index a90c460..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import org.apache.commons.lang.builder.HashCodeBuilder; - -/** - * Base class for objects that are unique to a namespace. - */ -public abstract class RemoteLocationContext - implements Comparable { - - /** - * Returns an identifier for a unique namespace. - * - * @return Namespace identifier. - */ - public abstract String getNameserviceId(); - - /** - * Destination in this location. For example the path in a remote namespace. - * - * @return Destination in this location. - */ - public abstract String getDest(); - - @Override - public int hashCode() { - return new HashCodeBuilder(17, 31) - .append(getNameserviceId()) - .append(getDest()) - .toHashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof RemoteLocationContext) { - RemoteLocationContext other = (RemoteLocationContext) obj; - return this.getNameserviceId().equals(other.getNameserviceId()) && - this.getDest().equals(other.getDest()); - } - return false; - } - - @Override - public int compareTo(RemoteLocationContext info) { - int ret = this.getNameserviceId().compareTo(info.getNameserviceId()); - if (ret == 0) { - ret = this.getDest().compareTo(info.getDest()); - } - return ret; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java deleted file mode 100644 index cd57d45..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.io.IOException; -import java.lang.reflect.Method; -import java.util.Arrays; - -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Determines the remote client protocol method and the parameter list for a - * specific location. - */ -public class RemoteMethod { - - private static final Logger LOG = LoggerFactory.getLogger(RemoteMethod.class); - - - /** List of parameters: static and dynamic values, matchings types. */ - private final Object[] params; - /** List of method parameters types, matches parameters. */ - private final Class[] types; - /** String name of the ClientProtocol method. */ - private final String methodName; - - /** - * Create a method with no parameters. - * - * @param method The string name of the ClientProtocol method. - */ - public RemoteMethod(String method) { - this.params = null; - this.types = null; - this.methodName = method; - } - - /** - * Creates a remote method generator. - * - * @param method The string name of the ClientProtocol method. - * @param pTypes A list of types to use to locate the specific method. - * @param pParams A list of parameters for the method. The order of the - * parameter list must match the order and number of the types. - * Parameters are grouped into 2 categories: - *

    - *
  • Static parameters that are immutable across locations. - *
  • Dynamic parameters that are determined for each location by a - * RemoteParam object. To specify a dynamic parameter, pass an - * instance of RemoteParam in place of the parameter value. - *
- * @throws IOException If the types and parameter lists are not valid. - */ - public RemoteMethod(String method, Class[] pTypes, Object... pParams) - throws IOException { - - if (pParams.length != pTypes.length) { - throw new IOException("Invalid parameters for method " + method); - } - - this.params = pParams; - this.types = pTypes; - this.methodName = method; - } - - /** - * Get the represented java method. - * - * @return Method - * @throws IOException If the method cannot be found. - */ - public Method getMethod() throws IOException { - try { - if (types != null) { - return ClientProtocol.class.getDeclaredMethod(methodName, types); - } else { - return ClientProtocol.class.getDeclaredMethod(methodName); - } - } catch (NoSuchMethodException e) { - // Re-throw as an IOException - LOG.error("Cannot get method {} with types {}", - methodName, Arrays.toString(types), e); - throw new IOException(e); - } catch (SecurityException e) { - LOG.error("Cannot access method {} with types {}", - methodName, Arrays.toString(types), e); - throw new IOException(e); - } - } - - /** - * Get the calling types for this method. - * - * @return An array of calling types. - */ - public Class[] getTypes() { - return this.types; - } - - /** - * Generate a list of parameters for this specific location using no context. - * - * @return A list of parameters for the method customized for the location. - */ - public Object[] getParams() { - return this.getParams(null); - } - - /** - * Get the name of the method. - * - * @return Name of the method. - */ - public String getMethodName() { - return this.methodName; - } - - /** - * Generate a list of parameters for this specific location. Parameters are - * grouped into 2 categories: - *
    - *
  • Static parameters that are immutable across locations. - *
  • Dynamic parameters that are determined for each location by a - * RemoteParam object. - *
- * - * @param context The context identifying the location. - * @return A list of parameters for the method customized for the location. - */ - public Object[] getParams(RemoteLocationContext context) { - if (this.params == null) { - return new Object[] {}; - } - Object[] objList = new Object[this.params.length]; - for (int i = 0; i < this.params.length; i++) { - Object currentObj = this.params[i]; - if (currentObj instanceof RemoteParam) { - // Map the parameter using the context - RemoteParam paramGetter = (RemoteParam) currentObj; - objList[i] = paramGetter.getParameterForContext(context); - } else { - objList[i] = currentObj; - } - } - return objList; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java deleted file mode 100644 index 8816ff6..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.util.Map; - -/** - * A dynamically assignable parameter that is location-specific. - *

- * There are 2 ways this mapping is determined: - *

    - *
  • Default: Uses the RemoteLocationContext's destination - *
  • Map: Uses the value of the RemoteLocationContext key provided in the - * parameter map. - *
- */ -public class RemoteParam { - - private final Map paramMap; - - /** - * Constructs a default remote parameter. Always maps the value to the - * destination of the provided RemoveLocationContext. - */ - public RemoteParam() { - this.paramMap = null; - } - - /** - * Constructs a map based remote parameter. Determines the value using the - * provided RemoteLocationContext as a key into the map. - * - * @param map Map with RemoteLocationContext keys. - */ - public RemoteParam( - Map map) { - this.paramMap = map; - } - - /** - * Determine the appropriate value for this parameter based on the location. - * - * @param context Context identifying the location. - * @return A parameter specific to this location. - */ - public Object getParameterForContext(RemoteLocationContext context) { - if (context == null) { - return null; - } else if (this.paramMap != null) { - return this.paramMap.get(context); - } else { - // Default case - return context.getDest(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java deleted file mode 100644 index be9adc1..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java +++ /dev/null @@ -1,656 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.newActiveNamenodeResolver; -import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.newFileSubclusterResolver; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.HAUtil; -import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics; -import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; -import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; -import org.apache.hadoop.hdfs.server.federation.store.RouterStore; -import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.source.JvmMetrics; -import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.util.JvmPauseMonitor; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; - -/** - * Router that provides a unified view of multiple federated HDFS clusters. It - * has two main roles: (1) federated interface and (2) NameNode heartbeat. - *

- * For the federated interface, the Router receives a client request, checks the - * State Store for the correct subcluster, and forwards the request to the - * active Namenode of that subcluster. The reply from the Namenode then flows in - * the opposite direction. The Routers are stateless and can be behind a load - * balancer. HDFS clients connect to the router using the same interfaces as are - * used to communicate with a namenode, namely the ClientProtocol RPC interface - * and the WebHdfs HTTP interface exposed by the router. {@link RouterRpcServer} - * {@link RouterHttpServer} - *

- * For NameNode heartbeat, the Router periodically checks the state of a - * NameNode (usually on the same server) and reports their high availability - * (HA) state and load/space status to the State Store. Note that this is an - * optional role as a Router can be independent of any subcluster. - * {@link StateStoreService} {@link NamenodeHeartbeatService} - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class Router extends CompositeService { - - private static final Logger LOG = LoggerFactory.getLogger(Router.class); - - - /** Configuration for the Router. */ - private Configuration conf; - - /** Router address/identifier. */ - private String routerId; - - /** RPC interface to the client. */ - private RouterRpcServer rpcServer; - private InetSocketAddress rpcAddress; - - /** RPC interface for the admin. */ - private RouterAdminServer adminServer; - private InetSocketAddress adminAddress; - - /** HTTP interface and web application. */ - private RouterHttpServer httpServer; - - /** Interface with the State Store. */ - private StateStoreService stateStore; - - /** Interface to map global name space to HDFS subcluster name spaces. */ - private FileSubclusterResolver subclusterResolver; - - /** Interface to identify the active NN for a nameservice or blockpool ID. */ - private ActiveNamenodeResolver namenodeResolver; - /** Updates the namenode status in the namenode resolver. */ - private Collection namenodeHeartbeatServices; - - /** Router metrics. */ - private RouterMetricsService metrics; - - /** JVM pauses (GC and others). */ - private JvmPauseMonitor pauseMonitor; - - /** Quota usage update service. */ - private RouterQuotaUpdateService quotaUpdateService; - /** Quota cache manager. */ - private RouterQuotaManager quotaManager; - - /** Manages the current state of the router. */ - private RouterStore routerStateManager; - /** Heartbeat our run status to the router state manager. */ - private RouterHeartbeatService routerHeartbeatService; - /** Enter/exit safemode. */ - private RouterSafemodeService safemodeService; - - /** The start time of the namesystem. */ - private final long startTime = Time.now(); - - /** State of the Router. */ - private RouterServiceState state = RouterServiceState.UNINITIALIZED; - - - ///////////////////////////////////////////////////////// - // Constructor - ///////////////////////////////////////////////////////// - - public Router() { - super(Router.class.getName()); - } - - ///////////////////////////////////////////////////////// - // Service management - ///////////////////////////////////////////////////////// - - @Override - protected void serviceInit(Configuration configuration) throws Exception { - this.conf = configuration; - updateRouterState(RouterServiceState.INITIALIZING); - - if (conf.getBoolean( - DFSConfigKeys.DFS_ROUTER_STORE_ENABLE, - DFSConfigKeys.DFS_ROUTER_STORE_ENABLE_DEFAULT)) { - // Service that maintains the State Store connection - this.stateStore = new StateStoreService(); - addService(this.stateStore); - } - - // Resolver to track active NNs - this.namenodeResolver = newActiveNamenodeResolver( - this.conf, this.stateStore); - if (this.namenodeResolver == null) { - throw new IOException("Cannot find namenode resolver."); - } - - // Lookup interface to map between the global and subcluster name spaces - this.subclusterResolver = newFileSubclusterResolver(this.conf, this); - if (this.subclusterResolver == null) { - throw new IOException("Cannot find subcluster resolver"); - } - - if (conf.getBoolean( - DFSConfigKeys.DFS_ROUTER_RPC_ENABLE, - DFSConfigKeys.DFS_ROUTER_RPC_ENABLE_DEFAULT)) { - // Create RPC server - this.rpcServer = createRpcServer(); - addService(this.rpcServer); - this.setRpcServerAddress(rpcServer.getRpcAddress()); - } - - if (conf.getBoolean( - DFSConfigKeys.DFS_ROUTER_ADMIN_ENABLE, - DFSConfigKeys.DFS_ROUTER_ADMIN_ENABLE_DEFAULT)) { - // Create admin server - this.adminServer = createAdminServer(); - addService(this.adminServer); - } - - if (conf.getBoolean( - DFSConfigKeys.DFS_ROUTER_HTTP_ENABLE, - DFSConfigKeys.DFS_ROUTER_HTTP_ENABLE_DEFAULT)) { - // Create HTTP server - this.httpServer = createHttpServer(); - addService(this.httpServer); - } - - if (conf.getBoolean( - DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE, - DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) { - - // Create status updater for each monitored Namenode - this.namenodeHeartbeatServices = createNamenodeHeartbeatServices(); - for (NamenodeHeartbeatService hearbeatService : - this.namenodeHeartbeatServices) { - addService(hearbeatService); - } - - if (this.namenodeHeartbeatServices.isEmpty()) { - LOG.error("Heartbeat is enabled but there are no namenodes to monitor"); - } - - // Periodically update the router state - this.routerHeartbeatService = new RouterHeartbeatService(this); - addService(this.routerHeartbeatService); - } - - // Router metrics system - if (conf.getBoolean( - DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE, - DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE_DEFAULT)) { - - DefaultMetricsSystem.initialize("Router"); - - this.metrics = new RouterMetricsService(this); - addService(this.metrics); - - // JVM pause monitor - this.pauseMonitor = new JvmPauseMonitor(); - this.pauseMonitor.init(conf); - } - - // Initial quota relevant service - if (conf.getBoolean(DFSConfigKeys.DFS_ROUTER_QUOTA_ENABLE, - DFSConfigKeys.DFS_ROUTER_QUOTA_ENABLED_DEFAULT)) { - this.quotaManager = new RouterQuotaManager(); - this.quotaUpdateService = new RouterQuotaUpdateService(this); - addService(this.quotaUpdateService); - } - - // Safemode service to refuse RPC calls when the router is out of sync - if (conf.getBoolean( - DFSConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE, - DFSConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE_DEFAULT)) { - // Create safemode monitoring service - this.safemodeService = new RouterSafemodeService(this); - addService(this.safemodeService); - } - - super.serviceInit(conf); - } - - @Override - protected void serviceStart() throws Exception { - - if (this.safemodeService == null) { - // Router is running now - updateRouterState(RouterServiceState.RUNNING); - } - - if (this.pauseMonitor != null) { - this.pauseMonitor.start(); - JvmMetrics jvmMetrics = this.metrics.getJvmMetrics(); - if (jvmMetrics != null) { - jvmMetrics.setPauseMonitor(pauseMonitor); - } - } - - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - - // Update state - updateRouterState(RouterServiceState.SHUTDOWN); - - // JVM pause monitor - if (this.pauseMonitor != null) { - this.pauseMonitor.stop(); - } - - super.serviceStop(); - } - - /** - * Shutdown the router. - */ - public void shutDown() { - new Thread() { - @Override - public void run() { - Router.this.stop(); - } - }.start(); - } - - ///////////////////////////////////////////////////////// - // RPC Server - ///////////////////////////////////////////////////////// - - /** - * Create a new Router RPC server to proxy ClientProtocol requests. - * - * @return New Router RPC Server. - * @throws IOException If the router RPC server was not started. - */ - protected RouterRpcServer createRpcServer() throws IOException { - return new RouterRpcServer(this.conf, this, this.getNamenodeResolver(), - this.getSubclusterResolver()); - } - - /** - * Get the Router RPC server. - * - * @return Router RPC server. - */ - public RouterRpcServer getRpcServer() { - return this.rpcServer; - } - - /** - * Set the current RPC socket for the router. - * - * @param address RPC address. - */ - protected void setRpcServerAddress(InetSocketAddress address) { - this.rpcAddress = address; - - // Use the RPC address as our unique router Id - if (this.rpcAddress != null) { - try { - String hostname = InetAddress.getLocalHost().getHostName(); - setRouterId(hostname + ":" + this.rpcAddress.getPort()); - } catch (UnknownHostException ex) { - LOG.error("Cannot set unique router ID, address not resolvable {}", - this.rpcAddress); - } - } - } - - /** - * Get the current RPC socket address for the router. - * - * @return InetSocketAddress - */ - public InetSocketAddress getRpcServerAddress() { - return this.rpcAddress; - } - - ///////////////////////////////////////////////////////// - // Admin server - ///////////////////////////////////////////////////////// - - /** - * Create a new router admin server to handle the router admin interface. - * - * @return RouterAdminServer - * @throws IOException If the admin server was not successfully started. - */ - protected RouterAdminServer createAdminServer() throws IOException { - return new RouterAdminServer(this.conf, this); - } - - /** - * Set the current Admin socket for the router. - * - * @param address Admin RPC address. - */ - protected void setAdminServerAddress(InetSocketAddress address) { - this.adminAddress = address; - } - - /** - * Get the current Admin socket address for the router. - * - * @return InetSocketAddress Admin address. - */ - public InetSocketAddress getAdminServerAddress() { - return adminAddress; - } - - ///////////////////////////////////////////////////////// - // HTTP server - ///////////////////////////////////////////////////////// - - /** - * Create an HTTP server for this Router. - * - * @return HTTP server for this Router. - */ - protected RouterHttpServer createHttpServer() { - return new RouterHttpServer(this); - } - - /** - * Get the current HTTP socket address for the router. - * - * @return InetSocketAddress HTTP address. - */ - public InetSocketAddress getHttpServerAddress() { - if (httpServer != null) { - return httpServer.getHttpAddress(); - } - return null; - } - - ///////////////////////////////////////////////////////// - // Namenode heartbeat monitors - ///////////////////////////////////////////////////////// - - /** - * Create each of the services that will monitor a Namenode. - * - * @return List of heartbeat services. - */ - protected Collection - createNamenodeHeartbeatServices() { - - Map ret = new HashMap<>(); - - if (conf.getBoolean( - DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE, - DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT)) { - // Create a local heartbet service - NamenodeHeartbeatService localHeartbeatService = - createLocalNamenodeHearbeatService(); - if (localHeartbeatService != null) { - String nnDesc = localHeartbeatService.getNamenodeDesc(); - ret.put(nnDesc, localHeartbeatService); - } - } - - // Create heartbeat services for a list specified by the admin - String namenodes = this.conf.get( - DFSConfigKeys.DFS_ROUTER_MONITOR_NAMENODE); - if (namenodes != null) { - for (String namenode : namenodes.split(",")) { - String[] namenodeSplit = namenode.split("\\."); - String nsId = null; - String nnId = null; - if (namenodeSplit.length == 2) { - nsId = namenodeSplit[0]; - nnId = namenodeSplit[1]; - } else if (namenodeSplit.length == 1) { - nsId = namenode; - } else { - LOG.error("Wrong Namenode to monitor: {}", namenode); - } - if (nsId != null) { - NamenodeHeartbeatService heartbeatService = - createNamenodeHearbeatService(nsId, nnId); - if (heartbeatService != null) { - ret.put(heartbeatService.getNamenodeDesc(), heartbeatService); - } - } - } - } - - return ret.values(); - } - - /** - * Create a new status updater for the local Namenode. - * - * @return Updater of the status for the local Namenode. - */ - protected NamenodeHeartbeatService createLocalNamenodeHearbeatService() { - // Detect NN running in this machine - String nsId = DFSUtil.getNamenodeNameServiceId(conf); - String nnId = null; - if (HAUtil.isHAEnabled(conf, nsId)) { - nnId = HAUtil.getNameNodeId(conf, nsId); - if (nnId == null) { - LOG.error("Cannot find namenode id for local {}", nsId); - } - } - - return createNamenodeHearbeatService(nsId, nnId); - } - - /** - * Create a heartbeat monitor for a particular Namenode. - * - * @param nsId Identifier of the nameservice to monitor. - * @param nnId Identifier of the namenode (HA) to monitor. - * @return Updater of the status for the specified Namenode. - */ - protected NamenodeHeartbeatService createNamenodeHearbeatService( - String nsId, String nnId) { - - LOG.info("Creating heartbeat service for Namenode {} in {}", nnId, nsId); - NamenodeHeartbeatService ret = new NamenodeHeartbeatService( - namenodeResolver, nsId, nnId); - return ret; - } - - ///////////////////////////////////////////////////////// - // Router State Management - ///////////////////////////////////////////////////////// - - /** - * Update the router state and heartbeat to the state store. - * - * @param state The new router state. - */ - public void updateRouterState(RouterServiceState newState) { - this.state = newState; - if (this.routerHeartbeatService != null) { - this.routerHeartbeatService.updateStateAsync(); - } - } - - /** - * Get the status of the router. - * - * @return Status of the router. - */ - public RouterServiceState getRouterState() { - return this.state; - } - - ///////////////////////////////////////////////////////// - // Submodule getters - ///////////////////////////////////////////////////////// - - /** - * Get the State Store service. - * - * @return State Store service. - */ - public StateStoreService getStateStore() { - return this.stateStore; - } - - /** - * Get the metrics system for the Router. - * - * @return Router metrics. - */ - public RouterMetrics getRouterMetrics() { - if (this.metrics != null) { - return this.metrics.getRouterMetrics(); - } - return null; - } - - /** - * Get the federation metrics. - * - * @return Federation metrics. - */ - public FederationMetrics getMetrics() { - if (this.metrics != null) { - return this.metrics.getFederationMetrics(); - } - return null; - } - - /** - * Get the subcluster resolver for files. - * - * @return Subcluster resolver for files. - */ - public FileSubclusterResolver getSubclusterResolver() { - return this.subclusterResolver; - } - - /** - * Get the namenode resolver for a subcluster. - * - * @return The namenode resolver for a subcluster. - */ - public ActiveNamenodeResolver getNamenodeResolver() { - return this.namenodeResolver; - } - - /** - * Get the state store interface for the router heartbeats. - * - * @return FederationRouterStateStore state store API handle. - */ - public RouterStore getRouterStateManager() { - if (this.routerStateManager == null && this.stateStore != null) { - this.routerStateManager = this.stateStore.getRegisteredRecordStore( - RouterStore.class); - } - return this.routerStateManager; - } - - ///////////////////////////////////////////////////////// - // Router info - ///////////////////////////////////////////////////////// - - /** - * Get the start date of the Router. - * - * @return Start date of the router. - */ - public long getStartTime() { - return this.startTime; - } - - /** - * Unique ID for the router, typically the hostname:port string for the - * router's RPC server. This ID may be null on router startup before the RPC - * server has bound to a port. - * - * @return Router identifier. - */ - public String getRouterId() { - return this.routerId; - } - - /** - * Sets a unique ID for this router. - * - * @param id Identifier of the Router. - */ - public void setRouterId(String id) { - this.routerId = id; - if (this.stateStore != null) { - this.stateStore.setIdentifier(this.routerId); - } - if (this.namenodeResolver != null) { - this.namenodeResolver.setRouterId(this.routerId); - } - } - - /** - * If the quota system is enabled in Router. - */ - public boolean isQuotaEnabled() { - return this.quotaManager != null; - } - - /** - * Get route quota manager. - * @return RouterQuotaManager Quota manager. - */ - public RouterQuotaManager getQuotaManager() { - return this.quotaManager; - } - - /** - * Get quota cache update service. - */ - @VisibleForTesting - RouterQuotaUpdateService getQuotaCacheUpdateService() { - return this.quotaUpdateService; - } - - /** - * Get the list of namenode heartbeat service. - */ - @VisibleForTesting - Collection getNamenodeHearbeatServices() { - return this.namenodeHeartbeatServices; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java deleted file mode 100644 index 0c0448c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java +++ /dev/null @@ -1,298 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY; - -import java.io.IOException; -import java.net.InetSocketAddress; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService; -import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB; -import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolServerSideTranslatorPB; -import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; -import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; -import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.EnterSafeModeRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.EnterSafeModeResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; -import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RPC.Server; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.service.AbstractService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.BlockingService; - -/** - * This class is responsible for handling all of the Admin calls to the HDFS - * router. It is created, started, and stopped by {@link Router}. - */ -public class RouterAdminServer extends AbstractService - implements MountTableManager, RouterStateManager { - - private static final Logger LOG = - LoggerFactory.getLogger(RouterAdminServer.class); - - private Configuration conf; - - private final Router router; - - private MountTableStore mountTableStore; - - /** The Admin server that listens to requests from clients. */ - private final Server adminServer; - private final InetSocketAddress adminAddress; - - /** - * Permission related info used for constructing new router permission - * checker instance. - */ - private static String routerOwner; - private static String superGroup; - private static boolean isPermissionEnabled; - - public RouterAdminServer(Configuration conf, Router router) - throws IOException { - super(RouterAdminServer.class.getName()); - - this.conf = conf; - this.router = router; - - int handlerCount = this.conf.getInt( - DFSConfigKeys.DFS_ROUTER_ADMIN_HANDLER_COUNT_KEY, - DFSConfigKeys.DFS_ROUTER_ADMIN_HANDLER_COUNT_DEFAULT); - - RPC.setProtocolEngine(this.conf, RouterAdminProtocolPB.class, - ProtobufRpcEngine.class); - - RouterAdminProtocolServerSideTranslatorPB routerAdminProtocolTranslator = - new RouterAdminProtocolServerSideTranslatorPB(this); - BlockingService clientNNPbService = RouterAdminProtocolService. - newReflectiveBlockingService(routerAdminProtocolTranslator); - - InetSocketAddress confRpcAddress = conf.getSocketAddr( - DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY, - DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, - DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT, - DFSConfigKeys.DFS_ROUTER_ADMIN_PORT_DEFAULT); - - String bindHost = conf.get( - DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY, - confRpcAddress.getHostName()); - LOG.info("Admin server binding to {}:{}", - bindHost, confRpcAddress.getPort()); - - initializePermissionSettings(this.conf); - this.adminServer = new RPC.Builder(this.conf) - .setProtocol(RouterAdminProtocolPB.class) - .setInstance(clientNNPbService) - .setBindAddress(bindHost) - .setPort(confRpcAddress.getPort()) - .setNumHandlers(handlerCount) - .setVerbose(false) - .build(); - - // The RPC-server port can be ephemeral... ensure we have the correct info - InetSocketAddress listenAddress = this.adminServer.getListenerAddress(); - this.adminAddress = new InetSocketAddress( - confRpcAddress.getHostName(), listenAddress.getPort()); - router.setAdminServerAddress(this.adminAddress); - } - - /** - * Initialize permission related settings. - * - * @param routerConf - * @throws IOException - */ - private static void initializePermissionSettings(Configuration routerConf) - throws IOException { - routerOwner = UserGroupInformation.getCurrentUser().getShortUserName(); - superGroup = routerConf.get( - DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, - DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); - isPermissionEnabled = routerConf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY, - DFS_PERMISSIONS_ENABLED_DEFAULT); - } - - /** Allow access to the client RPC server for testing. */ - @VisibleForTesting - Server getAdminServer() { - return this.adminServer; - } - - private MountTableStore getMountTableStore() throws IOException { - if (this.mountTableStore == null) { - this.mountTableStore = router.getStateStore().getRegisteredRecordStore( - MountTableStore.class); - if (this.mountTableStore == null) { - throw new IOException("Mount table state store is not available."); - } - } - return this.mountTableStore; - } - - /** - * Get the RPC address of the admin service. - * @return Administration service RPC address. - */ - public InetSocketAddress getRpcAddress() { - return this.adminAddress; - } - - @Override - protected void serviceInit(Configuration configuration) throws Exception { - this.conf = configuration; - super.serviceInit(conf); - } - - @Override - protected void serviceStart() throws Exception { - this.adminServer.start(); - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - if (this.adminServer != null) { - this.adminServer.stop(); - } - super.serviceStop(); - } - - @Override - public AddMountTableEntryResponse addMountTableEntry( - AddMountTableEntryRequest request) throws IOException { - return getMountTableStore().addMountTableEntry(request); - } - - @Override - public UpdateMountTableEntryResponse updateMountTableEntry( - UpdateMountTableEntryRequest request) throws IOException { - return getMountTableStore().updateMountTableEntry(request); - } - - @Override - public RemoveMountTableEntryResponse removeMountTableEntry( - RemoveMountTableEntryRequest request) throws IOException { - return getMountTableStore().removeMountTableEntry(request); - } - - @Override - public GetMountTableEntriesResponse getMountTableEntries( - GetMountTableEntriesRequest request) throws IOException { - return getMountTableStore().getMountTableEntries(request); - } - - @Override - public EnterSafeModeResponse enterSafeMode(EnterSafeModeRequest request) - throws IOException { - this.router.updateRouterState(RouterServiceState.SAFEMODE); - this.router.getRpcServer().setSafeMode(true); - return EnterSafeModeResponse.newInstance(verifySafeMode(true)); - } - - @Override - public LeaveSafeModeResponse leaveSafeMode(LeaveSafeModeRequest request) - throws IOException { - this.router.updateRouterState(RouterServiceState.RUNNING); - this.router.getRpcServer().setSafeMode(false); - return LeaveSafeModeResponse.newInstance(verifySafeMode(false)); - } - - @Override - public GetSafeModeResponse getSafeMode(GetSafeModeRequest request) - throws IOException { - boolean isInSafeMode = this.router.getRpcServer().isInSafeMode(); - return GetSafeModeResponse.newInstance(isInSafeMode); - } - - /** - * Verify if Router set safe mode state correctly. - * @param isInSafeMode Expected state to be set. - * @return - */ - private boolean verifySafeMode(boolean isInSafeMode) { - boolean serverInSafeMode = this.router.getRpcServer().isInSafeMode(); - RouterServiceState currentState = this.router.getRouterState(); - - return (isInSafeMode && currentState == RouterServiceState.SAFEMODE - && serverInSafeMode) - || (!isInSafeMode && currentState != RouterServiceState.SAFEMODE - && !serverInSafeMode); - } - - /** - * Get a new permission checker used for making mount table access - * control. This method will be invoked during each RPC call in router - * admin server. - * - * @return Router permission checker - * @throws AccessControlException - */ - public static RouterPermissionChecker getPermissionChecker() - throws AccessControlException { - if (!isPermissionEnabled) { - return null; - } - - try { - return new RouterPermissionChecker(routerOwner, superGroup, - NameNode.getRemoteUser()); - } catch (IOException e) { - throw new AccessControlException(e); - } - } - - /** - * Get super user name. - * - * @return String super user name. - */ - public static String getSuperUser() { - return routerOwner; - } - - /** - * Get super group name. - * - * @return String super group name. - */ - public static String getSuperGroup(){ - return superGroup; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java deleted file mode 100644 index b36e459..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB; -import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolTranslatorPB; -import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; - -/** - * Client to connect to the {@link Router} via the admin protocol. - */ -@Private -public class RouterClient implements Closeable { - - private final RouterAdminProtocolTranslatorPB proxy; - private final UserGroupInformation ugi; - - private static RouterAdminProtocolTranslatorPB createRouterProxy( - InetSocketAddress address, Configuration conf, UserGroupInformation ugi) - throws IOException { - - RPC.setProtocolEngine( - conf, RouterAdminProtocolPB.class, ProtobufRpcEngine.class); - - AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false); - final long version = RPC.getProtocolVersion(RouterAdminProtocolPB.class); - RouterAdminProtocolPB proxy = RPC.getProtocolProxy( - RouterAdminProtocolPB.class, version, address, ugi, conf, - NetUtils.getDefaultSocketFactory(conf), - RPC.getRpcTimeout(conf), null, - fallbackToSimpleAuth).getProxy(); - - return new RouterAdminProtocolTranslatorPB(proxy); - } - - public RouterClient(InetSocketAddress address, Configuration conf) - throws IOException { - this.ugi = UserGroupInformation.getCurrentUser(); - this.proxy = createRouterProxy(address, conf, ugi); - } - - public MountTableManager getMountTableManager() { - return proxy; - } - - public RouterStateManager getRouterStateManager() { - return proxy; - } - - @Override - public synchronized void close() throws IOException { - RPC.stopProxy(proxy); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java deleted file mode 100644 index 6e44984..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.federation.store.CachedRecordStore; -import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; -import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; -import org.apache.hadoop.hdfs.server.federation.store.RecordStore; -import org.apache.hadoop.hdfs.server.federation.store.RouterStore; -import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; -import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse; -import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; -import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; -import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Service to periodically update the Router current state in the State Store. - */ -public class RouterHeartbeatService extends PeriodicService { - - private static final Logger LOG = - LoggerFactory.getLogger(RouterHeartbeatService.class); - - /** Router we are hearbeating. */ - private final Router router; - - /** - * Create a new Router heartbeat service. - * - * @param router Router to heartbeat. - */ - public RouterHeartbeatService(Router router) { - super(RouterHeartbeatService.class.getSimpleName()); - this.router = router; - } - - /** - * Trigger the update of the Router state asynchronously. - */ - protected void updateStateAsync() { - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - updateStateStore(); - } - }, "Router Heartbeat Async"); - thread.setDaemon(true); - thread.start(); - } - - /** - * Update the state of the Router in the State Store. - */ - @VisibleForTesting - synchronized void updateStateStore() { - String routerId = router.getRouterId(); - if (routerId == null) { - LOG.error("Cannot heartbeat for router: unknown router id"); - return; - } - if (isStoreAvailable()) { - RouterStore routerStore = router.getRouterStateManager(); - try { - RouterState record = RouterState.newInstance( - routerId, router.getStartTime(), router.getRouterState()); - StateStoreVersion stateStoreVersion = StateStoreVersion.newInstance( - getStateStoreVersion(MembershipStore.class), - getStateStoreVersion(MountTableStore.class)); - record.setStateStoreVersion(stateStoreVersion); - RouterHeartbeatRequest request = - RouterHeartbeatRequest.newInstance(record); - RouterHeartbeatResponse response = routerStore.routerHeartbeat(request); - if (!response.getStatus()) { - LOG.warn("Cannot heartbeat router {}", routerId); - } else { - LOG.debug("Router heartbeat for router {}", routerId); - } - } catch (IOException e) { - LOG.error("Cannot heartbeat router {}: {}", routerId, e.getMessage()); - } - } else { - LOG.warn("Cannot heartbeat router {}: State Store unavailable", routerId); - } - } - - /** - * Get the version of the data in the State Store. - * - * @param clazz Class in the State Store. - * @return Version of the data. - */ - private > - long getStateStoreVersion(final Class clazz) { - long version = -1; - try { - StateStoreService stateStore = router.getStateStore(); - S recordStore = stateStore.getRegisteredRecordStore(clazz); - if (recordStore != null) { - if (recordStore instanceof CachedRecordStore) { - CachedRecordStore cachedRecordStore = - (CachedRecordStore) recordStore; - List records = cachedRecordStore.getCachedRecords(); - for (BaseRecord record : records) { - if (record.getDateModified() > version) { - version = record.getDateModified(); - } - } - } - } - } catch (Exception e) { - LOG.error("Cannot get version for {}: {}", clazz, e.getMessage()); - } - return version; - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - - long interval = conf.getTimeDuration( - DFSConfigKeys.DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS, - DFSConfigKeys.DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS_DEFAULT, - TimeUnit.MILLISECONDS); - this.setIntervalMs(interval); - - super.serviceInit(conf); - } - - @Override - public void periodicInvoke() { - updateStateStore(); - } - - private boolean isStoreAvailable() { - if (router.getRouterStateManager() == null) { - return false; - } - if (router.getStateStore() == null) { - return false; - } - return router.getStateStore().isDriverReady(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e2b5fa4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHttpServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHttpServer.java deleted file mode 100644 index 046f0ba..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHttpServer.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.net.InetSocketAddress; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.server.common.JspHelper; -import org.apache.hadoop.http.HttpServer2; -import org.apache.hadoop.service.AbstractService; - -/** - * Web interface for the {@link Router}. It exposes the Web UI and the WebHDFS - * methods from {@link RouterWebHdfsMethods}. - */ -public class RouterHttpServer extends AbstractService { - - protected static final String NAMENODE_ATTRIBUTE_KEY = "name.node"; - - - /** Configuration for the Router HTTP server. */ - private Configuration conf; - - /** Router using this HTTP server. */ - private final Router router; - - /** HTTP server. */ - private HttpServer2 httpServer; - - /** HTTP addresses. */ - private InetSocketAddress httpAddress; - private InetSocketAddress httpsAddress; - - - public RouterHttpServer(Router router) { - super(RouterHttpServer.class.getName()); - this.router = router; - } - - @Override - protected void serviceInit(Configuration configuration) throws Exception { - this.conf = configuration; - - // Get HTTP address - this.httpAddress = conf.getSocketAddr( - DFSConfigKeys.DFS_ROUTER_HTTP_BIND_HOST_KEY, - DFSConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY, - DFSConfigKeys.DFS_ROUTER_HTTP_ADDRESS_DEFAULT, - DFSConfigKeys.DFS_ROUTER_HTTP_PORT_DEFAULT); - - // Get HTTPs address - this.httpsAddress = conf.getSocketAddr( - DFSConfigKeys.DFS_ROUTER_HTTPS_BIND_HOST_KEY, - DFSConfigKeys.DFS_ROUTER_HTTPS_ADDRESS_KEY, - DFSConfigKeys.DFS_ROUTER_HTTPS_ADDRESS_DEFAULT, - DFSConfigKeys.DFS_ROUTER_HTTPS_PORT_DEFAULT); - - super.serviceInit(conf); - } - - @Override - protected void serviceStart() throws Exception { - // Build and start server - String webApp = "router"; - HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN( - this.conf, this.httpAddress, this.httpsAddress, webApp, - DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY, - DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY); - - this.httpServer = builder.build(); - - this.httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, this.router); - this.httpServer.setAttribute(JspHelper.CURRENT_CONF, this.conf); - setupServlets(this.httpServer, this.conf); - - this.httpServer.start(); - - // The server port can be ephemeral... ensure we have the correct info - InetSocketAddress listenAddress = this.httpServer.getConnectorAddress(0); - if (listenAddress != null) { - this.httpAddress = new InetSocketAddress(this.httpAddress.getHostName(), - listenAddress.getPort()); - } - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - if(this.httpServer != null) { - this.httpServer.stop(); - } - super.serviceStop(); - } - - private static void setupServlets( - HttpServer2 httpServer, Configuration conf) { - // TODO Add servlets for FSCK, etc - } - - public InetSocketAddress getHttpAddress() { - return this.httpAddress; - } - - public InetSocketAddress getHttpsAddress() { - return this.httpsAddress; - } -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org