Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AAB1D200CDE for ; Tue, 8 Aug 2017 23:44:57 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A7507167F96; Tue, 8 Aug 2017 21:44:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5090D167F84 for ; Tue, 8 Aug 2017 23:44:55 +0200 (CEST) Received: (qmail 86731 invoked by uid 500); 8 Aug 2017 21:44:54 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 86524 invoked by uid 99); 8 Aug 2017 21:44:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Aug 2017 21:44:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9515ADFA44; Tue, 8 Aug 2017 21:44:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: inigoiri@apache.org To: common-commits@hadoop.apache.org Date: Tue, 08 Aug 2017 21:44:53 -0000 Message-Id: <5a13bcaa97f84dbf85d19a0ac05b2536@git.apache.org> In-Reply-To: <33289306216f4263b356aa66d7053e33@git.apache.org> References: <33289306216f4263b356aa66d7053e33@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hadoop git commit: HDFS-10646. Federation admin tool. Contributed by Inigo Goiri. archived-at: Tue, 08 Aug 2017 21:44:57 -0000 HDFS-10646. Federation admin tool. Contributed by Inigo Goiri. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/96ce1278 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/96ce1278 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/96ce1278 Branch: refs/heads/HDFS-10467 Commit: 96ce12784d3e9b9fa1591f730e200e50d2b53a71 Parents: 64044a4 Author: Inigo Goiri Authored: Tue Aug 8 14:44:43 2017 -0700 Committer: Inigo Goiri Committed: Tue Aug 8 14:44:43 2017 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/pom.xml | 1 + .../hadoop-hdfs/src/main/bin/hdfs | 4 + .../hadoop-hdfs/src/main/bin/hdfs.cmd | 7 +- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 19 ++ .../hdfs/protocolPB/RouterAdminProtocolPB.java | 44 +++ ...uterAdminProtocolServerSideTranslatorPB.java | 151 ++++++++ .../RouterAdminProtocolTranslatorPB.java | 150 ++++++++ .../resolver/MembershipNamenodeResolver.java | 34 +- .../hdfs/server/federation/router/Router.java | 52 +++ .../federation/router/RouterAdminServer.java | 183 ++++++++++ .../server/federation/router/RouterClient.java | 76 +++++ .../hdfs/tools/federation/RouterAdmin.java | 341 +++++++++++++++++++ .../hdfs/tools/federation/package-info.java | 28 ++ .../src/main/proto/RouterProtocol.proto | 47 +++ .../src/main/resources/hdfs-default.xml | 46 +++ .../server/federation/RouterConfigBuilder.java | 26 ++ .../server/federation/RouterDFSCluster.java | 43 ++- .../server/federation/StateStoreDFSCluster.java | 148 ++++++++ .../federation/router/TestRouterAdmin.java | 261 ++++++++++++++ 19 files changed, 1643 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/96ce1278/hadoop-hdfs-project/hadoop-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml index a2233b5..b9edfe2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml @@ -337,6 +337,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> editlog.proto fsimage.proto FederationProtocol.proto + RouterProtocol.proto http://git-wip-us.apache.org/repos/asf/hadoop/blob/96ce1278/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs index 6606979..dfe229e 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs @@ -42,6 +42,7 @@ function hadoop_usage hadoop_add_subcommand "diskbalancer" "Distributes data evenly among disks on a given node" hadoop_add_subcommand "envvars" "display computed Hadoop environment variables" hadoop_add_subcommand "ec" "run a HDFS ErasureCoding CLI" + hadoop_add_subcommand "federation" "manage Router-based federation" hadoop_add_subcommand "fetchdt" "fetch a delegation token from the NameNode" hadoop_add_subcommand "fsck" "run a DFS filesystem checking utility" hadoop_add_subcommand "getconf" "get config values from configuration" @@ -181,6 +182,9 @@ function hdfscmd_case HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true" HADOOP_CLASSNAME='org.apache.hadoop.hdfs.server.federation.router.Router' ;; + federation) + HADOOP_CLASSNAME='org.apache.hadoop.hdfs.tools.federation.RouterAdmin' + ;; secondarynamenode) HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true" HADOOP_CLASSNAME='org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode' http://git-wip-us.apache.org/repos/asf/hadoop/blob/96ce1278/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd index b9853d6..53bdf70 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd @@ -59,7 +59,7 @@ if "%1" == "--loglevel" ( ) ) - set hdfscommands=dfs namenode secondarynamenode journalnode zkfc datanode dfsadmin haadmin fsck balancer jmxget oiv oev fetchdt getconf groups snapshotDiff lsSnapshottableDir cacheadmin mover storagepolicies classpath crypto router debug + set hdfscommands=dfs namenode secondarynamenode journalnode zkfc datanode dfsadmin haadmin fsck balancer jmxget oiv oev fetchdt getconf groups snapshotDiff lsSnapshottableDir cacheadmin mover storagepolicies classpath crypto router federation debug for %%i in ( %hdfscommands% ) do ( if %hdfs-command% == %%i set hdfscommand=true ) @@ -184,6 +184,11 @@ goto :eof set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_ROUTER_OPTS% goto :eof +:federation + set CLASS=org.apache.hadoop.hdfs.tools.federation.RouterAdmin + set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_ROUTER_OPTS% + goto :eof + :debug set CLASS=org.apache.hadoop.hdfs.tools.DebugAdmin goto :eof http://git-wip-us.apache.org/repos/asf/hadoop/blob/96ce1278/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 1daf10a..2ee50aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1171,6 +1171,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String FEDERATION_STORE_PREFIX = FEDERATION_ROUTER_PREFIX + "store."; + public static final String DFS_ROUTER_STORE_ENABLE = + FEDERATION_STORE_PREFIX + "enable"; + public static final boolean DFS_ROUTER_STORE_ENABLE_DEFAULT = true; + public static final String FEDERATION_STORE_SERIALIZER_CLASS = DFSConfigKeys.FEDERATION_STORE_PREFIX + "serializer"; public static final Class @@ -1197,6 +1201,21 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT = TimeUnit.MINUTES.toMillis(5); + // HDFS Router-based federation admin + public static final String DFS_ROUTER_ADMIN_HANDLER_COUNT_KEY = + FEDERATION_ROUTER_PREFIX + "admin.handler.count"; + public static final int DFS_ROUTER_ADMIN_HANDLER_COUNT_DEFAULT = 1; + public static final int DFS_ROUTER_ADMIN_PORT_DEFAULT = 8111; + public static final String DFS_ROUTER_ADMIN_ADDRESS_KEY = + FEDERATION_ROUTER_PREFIX + "admin-address"; + public static final String DFS_ROUTER_ADMIN_ADDRESS_DEFAULT = + "0.0.0.0:" + DFS_ROUTER_ADMIN_PORT_DEFAULT; + public static final String DFS_ROUTER_ADMIN_BIND_HOST_KEY = + FEDERATION_ROUTER_PREFIX + "admin-bind-host"; + public static final String DFS_ROUTER_ADMIN_ENABLE = + FEDERATION_ROUTER_PREFIX + "admin.enable"; + public static final boolean DFS_ROUTER_ADMIN_ENABLE_DEFAULT = true; + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY http://git-wip-us.apache.org/repos/asf/hadoop/blob/96ce1278/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolPB.java new file mode 100644 index 0000000..96fa794 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolPB.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.security.KerberosInfo; +import org.apache.hadoop.security.token.TokenInfo; + +/** + * Protocol that a clients use to communicate with the NameNode. + * Note: This extends the protocolbuffer service based interface to + * add annotations required for security. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +@KerberosInfo( + serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY) +@TokenInfo(DelegationTokenSelector.class) +@ProtocolInfo(protocolName = HdfsConstants.CLIENT_NAMENODE_PROTOCOL_NAME, + protocolVersion = 1) +public interface RouterAdminProtocolPB extends + RouterAdminProtocolService.BlockingInterface { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/96ce1278/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolServerSideTranslatorPB.java new file mode 100644 index 0000000..415bbd9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolServerSideTranslatorPB.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryRequestProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryResponseProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesRequestProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesResponseProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryResponseProto; +import org.apache.hadoop.hdfs.server.federation.router.RouterAdminServer; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryRequestPBImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryResponsePBImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesRequestPBImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesResponsePBImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryRequestPBImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryResponsePBImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryRequestPBImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryResponsePBImpl; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * This class is used on the server side. Calls come across the wire for the for + * protocol {@link RouterAdminProtocolPB}. This class translates the PB data + * types to the native data types used inside the HDFS Router as specified in + * the generic RouterAdminProtocol. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class RouterAdminProtocolServerSideTranslatorPB implements + RouterAdminProtocolPB { + + private final RouterAdminServer server; + + /** + * Constructor. + * @param server The NN server. + * @throws IOException + */ + public RouterAdminProtocolServerSideTranslatorPB(RouterAdminServer server) + throws IOException { + this.server = server; + } + + @Override + public AddMountTableEntryResponseProto addMountTableEntry( + RpcController controller, AddMountTableEntryRequestProto request) + throws ServiceException { + + try { + AddMountTableEntryRequest req = + new AddMountTableEntryRequestPBImpl(request); + AddMountTableEntryResponse response = server.addMountTableEntry(req); + AddMountTableEntryResponsePBImpl responsePB = + (AddMountTableEntryResponsePBImpl)response; + return responsePB.getProto(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + /** + * Remove an entry from the mount table. + */ + @Override + public RemoveMountTableEntryResponseProto removeMountTableEntry( + RpcController controller, RemoveMountTableEntryRequestProto request) + throws ServiceException { + try { + RemoveMountTableEntryRequest req = + new RemoveMountTableEntryRequestPBImpl(request); + RemoveMountTableEntryResponse response = + server.removeMountTableEntry(req); + RemoveMountTableEntryResponsePBImpl responsePB = + (RemoveMountTableEntryResponsePBImpl)response; + return responsePB.getProto(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + /** + * Get matching mount table entries. + */ + @Override + public GetMountTableEntriesResponseProto getMountTableEntries( + RpcController controller, GetMountTableEntriesRequestProto request) + throws ServiceException { + try { + GetMountTableEntriesRequest req = + new GetMountTableEntriesRequestPBImpl(request); + GetMountTableEntriesResponse response = server.getMountTableEntries(req); + GetMountTableEntriesResponsePBImpl responsePB = + (GetMountTableEntriesResponsePBImpl)response; + return responsePB.getProto(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + /** + * Update a single mount table entry. + */ + @Override + public UpdateMountTableEntryResponseProto updateMountTableEntry( + RpcController controller, UpdateMountTableEntryRequestProto request) + throws ServiceException { + try { + UpdateMountTableEntryRequest req = + new UpdateMountTableEntryRequestPBImpl(request); + UpdateMountTableEntryResponse response = + server.updateMountTableEntry(req); + UpdateMountTableEntryResponsePBImpl responsePB = + (UpdateMountTableEntryResponsePBImpl)response; + return responsePB.getProto(); + } catch (IOException e) { + throw new ServiceException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/96ce1278/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java new file mode 100644 index 0000000..43663ac --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryRequestProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryResponseProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesRequestProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesResponseProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryResponseProto; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryRequestPBImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryResponsePBImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesRequestPBImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesResponsePBImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryRequestPBImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryResponsePBImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryRequestPBImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryResponsePBImpl; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtocolMetaInterface; +import org.apache.hadoop.ipc.ProtocolTranslator; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RpcClientUtil; + +import com.google.protobuf.ServiceException; + +/** + * This class forwards NN's ClientProtocol calls as RPC calls to the NN server + * while translating from the parameter types used in ClientProtocol to the + * new PB types. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class RouterAdminProtocolTranslatorPB + implements ProtocolMetaInterface, MountTableManager, + Closeable, ProtocolTranslator { + final private RouterAdminProtocolPB rpcProxy; + + public RouterAdminProtocolTranslatorPB(RouterAdminProtocolPB proxy) { + rpcProxy = proxy; + } + + @Override + public void close() { + RPC.stopProxy(rpcProxy); + } + + @Override + public Object getUnderlyingProxyObject() { + return rpcProxy; + } + + @Override + public boolean isMethodSupported(String methodName) throws IOException { + return RpcClientUtil.isMethodSupported(rpcProxy, + RouterAdminProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, + RPC.getProtocolVersion(RouterAdminProtocolPB.class), methodName); + } + + @Override + public AddMountTableEntryResponse addMountTableEntry( + AddMountTableEntryRequest request) throws IOException { + AddMountTableEntryRequestPBImpl requestPB = + (AddMountTableEntryRequestPBImpl)request; + AddMountTableEntryRequestProto proto = requestPB.getProto(); + try { + AddMountTableEntryResponseProto response = + rpcProxy.addMountTableEntry(null, proto); + return new AddMountTableEntryResponsePBImpl(response); + } catch (ServiceException e) { + throw new IOException(ProtobufHelper.getRemoteException(e).getMessage()); + } + } + + @Override + public UpdateMountTableEntryResponse updateMountTableEntry( + UpdateMountTableEntryRequest request) throws IOException { + UpdateMountTableEntryRequestPBImpl requestPB = + (UpdateMountTableEntryRequestPBImpl)request; + UpdateMountTableEntryRequestProto proto = requestPB.getProto(); + try { + UpdateMountTableEntryResponseProto response = + rpcProxy.updateMountTableEntry(null, proto); + return new UpdateMountTableEntryResponsePBImpl(response); + } catch (ServiceException e) { + throw new IOException(ProtobufHelper.getRemoteException(e).getMessage()); + } + } + + @Override + public RemoveMountTableEntryResponse removeMountTableEntry( + RemoveMountTableEntryRequest request) throws IOException { + RemoveMountTableEntryRequestPBImpl requestPB = + (RemoveMountTableEntryRequestPBImpl)request; + RemoveMountTableEntryRequestProto proto = requestPB.getProto(); + try { + RemoveMountTableEntryResponseProto responseProto = + rpcProxy.removeMountTableEntry(null, proto); + return new RemoveMountTableEntryResponsePBImpl(responseProto); + } catch (ServiceException e) { + throw new IOException(ProtobufHelper.getRemoteException(e).getMessage()); + } + } + + @Override + public GetMountTableEntriesResponse getMountTableEntries( + GetMountTableEntriesRequest request) throws IOException { + GetMountTableEntriesRequestPBImpl requestPB = + (GetMountTableEntriesRequestPBImpl)request; + GetMountTableEntriesRequestProto proto = requestPB.getProto(); + try { + GetMountTableEntriesResponseProto response = + rpcProxy.getMountTableEntries(null, proto); + return new GetMountTableEntriesResponsePBImpl(response); + } catch (ServiceException e) { + throw new IOException(ProtobufHelper.getRemoteException(e).getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/96ce1278/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java index b0ced24..d974c78 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java @@ -61,7 +61,7 @@ public class MembershipNamenodeResolver /** Reference to the State Store. */ private final StateStoreService stateStore; /** Membership State Store interface. */ - private final MembershipStore membershipInterface; + private MembershipStore membershipInterface; /** Parent router ID. */ private String routerId; @@ -82,25 +82,27 @@ public class MembershipNamenodeResolver if (this.stateStore != null) { // Request cache updates from the state store this.stateStore.registerCacheExternal(this); - - // Initialize the interface to get the membership - this.membershipInterface = this.stateStore.getRegisteredRecordStore( - MembershipStore.class); - } else { - this.membershipInterface = null; } + } + private synchronized MembershipStore getMembershipStore() throws IOException { if (this.membershipInterface == null) { - throw new IOException("State Store does not have an interface for " + - MembershipStore.class.getSimpleName()); + this.membershipInterface = this.stateStore.getRegisteredRecordStore( + MembershipStore.class); + if (this.membershipInterface == null) { + throw new IOException("State Store does not have an interface for " + + MembershipStore.class.getSimpleName()); + } } + return this.membershipInterface; } @Override public boolean loadCache(boolean force) { // Our cache depends on the store, update it first try { - this.membershipInterface.loadCache(force); + MembershipStore membership = getMembershipStore(); + membership.loadCache(force); } catch (IOException e) { LOG.error("Cannot update membership from the State Store", e); } @@ -126,8 +128,9 @@ public class MembershipNamenodeResolver GetNamenodeRegistrationsRequest request = GetNamenodeRegistrationsRequest.newInstance(partial); + MembershipStore membership = getMembershipStore(); GetNamenodeRegistrationsResponse response = - this.membershipInterface.getNamenodeRegistrations(request); + membership.getNamenodeRegistrations(request); List records = response.getNamenodeMemberships(); if (records != null && records.size() == 1) { @@ -135,7 +138,7 @@ public class MembershipNamenodeResolver UpdateNamenodeRegistrationRequest updateRequest = UpdateNamenodeRegistrationRequest.newInstance( record.getNameserviceId(), record.getNamenodeId(), ACTIVE); - this.membershipInterface.updateNamenodeRegistration(updateRequest); + membership.updateNamenodeRegistration(updateRequest); } } catch (StateStoreUnavailableException e) { LOG.error("Cannot update {} as active, State Store unavailable", address); @@ -226,14 +229,14 @@ public class MembershipNamenodeResolver NamenodeHeartbeatRequest request = NamenodeHeartbeatRequest.newInstance(); request.setNamenodeMembership(record); - return this.membershipInterface.namenodeHeartbeat(request).getResult(); + return getMembershipStore().namenodeHeartbeat(request).getResult(); } @Override public Set getNamespaces() throws IOException { GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance(); GetNamespaceInfoResponse response = - this.membershipInterface.getNamespaceInfo(request); + getMembershipStore().getNamespaceInfo(request); return response.getNamespaceInfo(); } @@ -259,8 +262,9 @@ public class MembershipNamenodeResolver // Retrieve a list of all registrations that match this query. // This may include all NN records for a namespace/blockpool, including // duplicate records for the same NN from different routers. + MembershipStore membershipStore = getMembershipStore(); GetNamenodeRegistrationsResponse response = - this.membershipInterface.getNamenodeRegistrations(request); + membershipStore.getNamenodeRegistrations(request); List memberships = response.getNamenodeMemberships(); if (!addExpired || !addUnavailable) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/96ce1278/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java index 213a58f..fcbd2eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java @@ -81,6 +81,10 @@ public class Router extends CompositeService { private RouterRpcServer rpcServer; private InetSocketAddress rpcAddress; + /** RPC interface for the admin. */ + private RouterAdminServer adminServer; + private InetSocketAddress adminAddress; + /** Interface with the State Store. */ private StateStoreService stateStore; @@ -116,6 +120,14 @@ public class Router extends CompositeService { protected void serviceInit(Configuration configuration) throws Exception { this.conf = configuration; + if (conf.getBoolean( + DFSConfigKeys.DFS_ROUTER_STORE_ENABLE, + DFSConfigKeys.DFS_ROUTER_STORE_ENABLE_DEFAULT)) { + // Service that maintains the State Store connection + this.stateStore = new StateStoreService(); + addService(this.stateStore); + } + // Resolver to track active NNs this.namenodeResolver = newActiveNamenodeResolver( this.conf, this.stateStore); @@ -139,6 +151,14 @@ public class Router extends CompositeService { } if (conf.getBoolean( + DFSConfigKeys.DFS_ROUTER_ADMIN_ENABLE, + DFSConfigKeys.DFS_ROUTER_ADMIN_ENABLE_DEFAULT)) { + // Create admin server + this.adminServer = createAdminServer(); + addService(this.adminServer); + } + + if (conf.getBoolean( DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE, DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) { @@ -264,6 +284,38 @@ public class Router extends CompositeService { } ///////////////////////////////////////////////////////// + // Admin server + ///////////////////////////////////////////////////////// + + /** + * Create a new router admin server to handle the router admin interface. + * + * @return RouterAdminServer + * @throws IOException If the admin server was not successfully started. + */ + protected RouterAdminServer createAdminServer() throws IOException { + return new RouterAdminServer(this.conf, this); + } + + /** + * Set the current Admin socket for the router. + * + * @param adminAddress Admin RPC address. + */ + protected void setAdminServerAddress(InetSocketAddress address) { + this.adminAddress = address; + } + + /** + * Get the current Admin socket address for the router. + * + * @return InetSocketAddress Admin address. + */ + public InetSocketAddress getAdminServerAddress() { + return adminAddress; + } + + ///////////////////////////////////////////////////////// // Namenode heartbeat monitors ///////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/hadoop/blob/96ce1278/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java new file mode 100644 index 0000000..7687216 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService; +import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hadoop.service.AbstractService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.BlockingService; + +/** + * This class is responsible for handling all of the Admin calls to the HDFS + * router. It is created, started, and stopped by {@link Router}. + */ +public class RouterAdminServer extends AbstractService + implements MountTableManager { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterAdminServer.class); + + private Configuration conf; + + private final Router router; + + private MountTableStore mountTableStore; + + /** The Admin server that listens to requests from clients. */ + private final Server adminServer; + private final InetSocketAddress adminAddress; + + public RouterAdminServer(Configuration conf, Router router) + throws IOException { + super(RouterAdminServer.class.getName()); + + this.conf = conf; + this.router = router; + + int handlerCount = this.conf.getInt( + DFSConfigKeys.DFS_ROUTER_ADMIN_HANDLER_COUNT_KEY, + DFSConfigKeys.DFS_ROUTER_ADMIN_HANDLER_COUNT_DEFAULT); + + RPC.setProtocolEngine(this.conf, RouterAdminProtocolPB.class, + ProtobufRpcEngine.class); + + RouterAdminProtocolServerSideTranslatorPB routerAdminProtocolTranslator = + new RouterAdminProtocolServerSideTranslatorPB(this); + BlockingService clientNNPbService = RouterAdminProtocolService. + newReflectiveBlockingService(routerAdminProtocolTranslator); + + InetSocketAddress confRpcAddress = conf.getSocketAddr( + DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY, + DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, + DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT, + DFSConfigKeys.DFS_ROUTER_ADMIN_PORT_DEFAULT); + + String bindHost = conf.get( + DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY, + confRpcAddress.getHostName()); + LOG.info("Admin server binding to {}:{}", + bindHost, confRpcAddress.getPort()); + + this.adminServer = new RPC.Builder(this.conf) + .setProtocol(RouterAdminProtocolPB.class) + .setInstance(clientNNPbService) + .setBindAddress(bindHost) + .setPort(confRpcAddress.getPort()) + .setNumHandlers(handlerCount) + .setVerbose(false) + .build(); + + // The RPC-server port can be ephemeral... ensure we have the correct info + InetSocketAddress listenAddress = this.adminServer.getListenerAddress(); + this.adminAddress = new InetSocketAddress( + confRpcAddress.getHostName(), listenAddress.getPort()); + router.setAdminServerAddress(this.adminAddress); + } + + /** Allow access to the client RPC server for testing. */ + @VisibleForTesting + Server getAdminServer() { + return this.adminServer; + } + + private MountTableStore getMountTableStore() throws IOException { + if (this.mountTableStore == null) { + this.mountTableStore = router.getStateStore().getRegisteredRecordStore( + MountTableStore.class); + if (this.mountTableStore == null) { + throw new IOException("Mount table state store is not available."); + } + } + return this.mountTableStore; + } + + /** + * Get the RPC address of the admin service. + * @return Administration service RPC address. + */ + public InetSocketAddress getRpcAddress() { + return this.adminAddress; + } + + @Override + protected void serviceInit(Configuration configuration) throws Exception { + this.conf = configuration; + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + this.adminServer.start(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (this.adminServer != null) { + this.adminServer.stop(); + } + super.serviceStop(); + } + + @Override + public AddMountTableEntryResponse addMountTableEntry( + AddMountTableEntryRequest request) throws IOException { + return getMountTableStore().addMountTableEntry(request); + } + + @Override + public UpdateMountTableEntryResponse updateMountTableEntry( + UpdateMountTableEntryRequest request) throws IOException { + return getMountTableStore().updateMountTableEntry(request); + } + + @Override + public RemoveMountTableEntryResponse removeMountTableEntry( + RemoveMountTableEntryRequest request) throws IOException { + return getMountTableStore().removeMountTableEntry(request); + } + + @Override + public GetMountTableEntriesResponse getMountTableEntries( + GetMountTableEntriesRequest request) throws IOException { + return getMountTableStore().getMountTableEntries(request); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/96ce1278/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java new file mode 100644 index 0000000..1f76b98 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolTranslatorPB; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Client to connect to the {@link Router} via the admin protocol. + */ +@Private +public class RouterClient implements Closeable { + + private final RouterAdminProtocolTranslatorPB proxy; + private final UserGroupInformation ugi; + + private static RouterAdminProtocolTranslatorPB createRouterProxy( + InetSocketAddress address, Configuration conf, UserGroupInformation ugi) + throws IOException { + + RPC.setProtocolEngine( + conf, RouterAdminProtocolPB.class, ProtobufRpcEngine.class); + + AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false); + final long version = RPC.getProtocolVersion(RouterAdminProtocolPB.class); + RouterAdminProtocolPB proxy = RPC.getProtocolProxy( + RouterAdminProtocolPB.class, version, address, ugi, conf, + NetUtils.getDefaultSocketFactory(conf), + RPC.getRpcTimeout(conf), null, + fallbackToSimpleAuth).getProxy(); + + return new RouterAdminProtocolTranslatorPB(proxy); + } + + public RouterClient(InetSocketAddress address, Configuration conf) + throws IOException { + this.ugi = UserGroupInformation.getCurrentUser(); + this.proxy = createRouterProxy(address, conf, ugi); + } + + public MountTableManager getMountTableManager() { + return proxy; + } + + @Override + public synchronized void close() throws IOException { + RPC.stopProxy(proxy); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/96ce1278/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java new file mode 100644 index 0000000..0786419 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.tools.federation; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.hdfs.server.federation.router.RouterClient; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class provides some Federation administrative access shell commands. + */ +@Private +public class RouterAdmin extends Configured implements Tool { + + private static final Logger LOG = LoggerFactory.getLogger(RouterAdmin.class); + + private RouterClient client; + + public static void main(String[] argv) throws Exception { + Configuration conf = new HdfsConfiguration(); + RouterAdmin admin = new RouterAdmin(conf); + + int res = ToolRunner.run(admin, argv); + System.exit(res); + } + + public RouterAdmin(Configuration conf) { + super(conf); + } + + /** + * Print the usage message. + */ + public void printUsage() { + String usage = "Federation Admin Tools:\n" + + "\t[-add " + + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL]]\n" + + "\t[-rm ]\n" + + "\t[-ls ]\n"; + System.out.println(usage); + } + + @Override + public int run(String[] argv) throws Exception { + if (argv.length < 1) { + System.err.println("Not enough parameters specificed"); + printUsage(); + return -1; + } + + int exitCode = -1; + int i = 0; + String cmd = argv[i++]; + + // Verify that we have enough command line parameters + if ("-add".equals(cmd)) { + if (argv.length < 4) { + System.err.println("Not enough parameters specificed for cmd " + cmd); + printUsage(); + return exitCode; + } + } else if ("-rm".equalsIgnoreCase(cmd)) { + if (argv.length < 2) { + System.err.println("Not enough parameters specificed for cmd " + cmd); + printUsage(); + return exitCode; + } + } + + // Initialize RouterClient + try { + String address = getConf().getTrimmed( + DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, + DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT); + InetSocketAddress routerSocket = NetUtils.createSocketAddr(address); + client = new RouterClient(routerSocket, getConf()); + } catch (RPC.VersionMismatch v) { + System.err.println( + "Version mismatch between client and server... command aborted"); + return exitCode; + } catch (IOException e) { + System.err.println("Bad connection to Router... command aborted"); + return exitCode; + } + + Exception debugException = null; + exitCode = 0; + try { + if ("-add".equals(cmd)) { + if (addMount(argv, i)) { + System.err.println("Successfuly added mount point " + argv[i]); + } + } else if ("-rm".equals(cmd)) { + if (removeMount(argv[i])) { + System.err.println("Successfully removed mount point " + argv[i]); + } + } else if ("-ls".equals(cmd)) { + if (argv.length > 1) { + listMounts(argv[i]); + } else { + listMounts("/"); + } + } else { + printUsage(); + return exitCode; + } + } catch (IllegalArgumentException arge) { + debugException = arge; + exitCode = -1; + System.err.println(cmd.substring(1) + ": " + arge.getLocalizedMessage()); + printUsage(); + } catch (RemoteException e) { + // This is a error returned by the server. + // Print out the first line of the error message, ignore the stack trace. + exitCode = -1; + debugException = e; + try { + String[] content; + content = e.getLocalizedMessage().split("\n"); + System.err.println(cmd.substring(1) + ": " + content[0]); + e.printStackTrace(); + } catch (Exception ex) { + System.err.println(cmd.substring(1) + ": " + ex.getLocalizedMessage()); + e.printStackTrace(); + debugException = ex; + } + } catch (Exception e) { + exitCode = -1; + debugException = e; + System.err.println(cmd.substring(1) + ": " + e.getLocalizedMessage()); + e.printStackTrace(); + } + if (debugException != null) { + LOG.debug("Exception encountered", debugException); + } + return exitCode; + } + + /** + * Add a mount table entry or update if it exists. + * + * @param parameters Parameters for the mount point. + * @param i Index in the parameters. + */ + public boolean addMount(String[] parameters, int i) throws IOException { + // Mandatory parameters + String mount = parameters[i++]; + String[] nss = parameters[i++].split(","); + String dest = parameters[i++]; + + // Optional parameters + boolean readOnly = false; + DestinationOrder order = DestinationOrder.HASH; + while (i < parameters.length) { + if (parameters[i].equals("-readonly")) { + readOnly = true; + } else if (parameters[i].equals("-order")) { + i++; + try { + order = DestinationOrder.valueOf(parameters[i]); + } catch(Exception e) { + System.err.println("Cannot parse order: " + parameters[i]); + } + } + i++; + } + + return addMount(mount, nss, dest, readOnly, order); + } + + /** + * Add a mount table entry or update if it exists. + * + * @param mount Mount point. + * @param nss Namespaces where this is mounted to. + * @param dest Destination path. + * @param readonly If the mount point is read only. + * @param order Order of the destination locations. + * @return If the mount point was added. + * @throws IOException Error adding the mount point. + */ + public boolean addMount(String mount, String[] nss, String dest, + boolean readonly, DestinationOrder order) throws IOException { + // Get the existing entry + MountTableManager mountTable = client.getMountTableManager(); + GetMountTableEntriesRequest getRequest = + GetMountTableEntriesRequest.newInstance(mount); + GetMountTableEntriesResponse getResponse = + mountTable.getMountTableEntries(getRequest); + List results = getResponse.getEntries(); + MountTable existingEntry = null; + for (MountTable result : results) { + if (mount.equals(result.getSourcePath())) { + existingEntry = result; + } + } + + if (existingEntry == null) { + // Create and add the entry if it doesn't exist + Map destMap = new LinkedHashMap<>(); + for (String ns : nss) { + destMap.put(ns, dest); + } + MountTable newEntry = MountTable.newInstance(mount, destMap); + if (readonly) { + newEntry.setReadOnly(true); + } + if (order != null) { + newEntry.setDestOrder(order); + } + AddMountTableEntryRequest request = + AddMountTableEntryRequest.newInstance(newEntry); + AddMountTableEntryResponse addResponse = + mountTable.addMountTableEntry(request); + boolean added = addResponse.getStatus(); + if (!added) { + System.err.println("Cannot add mount point " + mount); + } + return added; + } else { + // Update the existing entry if it exists + for (String nsId : nss) { + if (!existingEntry.addDestination(nsId, dest)) { + System.err.println("Cannot add destination at " + nsId + " " + dest); + } + } + if (readonly) { + existingEntry.setReadOnly(true); + } + if (order != null) { + existingEntry.setDestOrder(order); + } + UpdateMountTableEntryRequest updateRequest = + UpdateMountTableEntryRequest.newInstance(existingEntry); + UpdateMountTableEntryResponse updateResponse = + mountTable.updateMountTableEntry(updateRequest); + boolean updated = updateResponse.getStatus(); + if (!updated) { + System.err.println("Cannot update mount point " + mount); + } + return updated; + } + } + + /** + * Remove mount point. + * + * @param path Path to remove. + * @throws IOException If it cannot be removed. + */ + public boolean removeMount(String path) throws IOException { + MountTableManager mountTable = client.getMountTableManager(); + RemoveMountTableEntryRequest request = + RemoveMountTableEntryRequest.newInstance(path); + RemoveMountTableEntryResponse response = + mountTable.removeMountTableEntry(request); + boolean removed = response.getStatus(); + if (!removed) { + System.out.println("Cannot remove mount point " + path); + } + return removed; + } + + /** + * List mount points. + * + * @param path Path to list. + * @throws IOException If it cannot be listed. + */ + public void listMounts(String path) throws IOException { + MountTableManager mountTable = client.getMountTableManager(); + GetMountTableEntriesRequest request = + GetMountTableEntriesRequest.newInstance(path); + GetMountTableEntriesResponse response = + mountTable.getMountTableEntries(request); + List entries = response.getEntries(); + printMounts(entries); + } + + private static void printMounts(List entries) { + System.out.println("Mount Table Entries:"); + System.out.println(String.format( + "%-25s %-25s", + "Source", "Destinations")); + for (MountTable entry : entries) { + StringBuilder destBuilder = new StringBuilder(); + for (RemoteLocation location : entry.getDestinations()) { + if (destBuilder.length() > 0) { + destBuilder.append(","); + } + destBuilder.append(String.format("%s->%s", location.getNameserviceId(), + location.getDest())); + } + System.out.println(String.format("%-25s %-25s", entry.getSourcePath(), + destBuilder.toString())); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/96ce1278/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/package-info.java new file mode 100644 index 0000000..466c3d3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/package-info.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * It includes the tools to manage the Router-based federation. Includes the + * utilities to add and remove mount table entries. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +package org.apache.hadoop.hdfs.tools.federation; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/96ce1278/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/RouterProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/RouterProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/RouterProtocol.proto new file mode 100644 index 0000000..3f43040 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/RouterProtocol.proto @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.hdfs.protocol.proto"; +option java_outer_classname = "RouterProtocolProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.hdfs.router; + +import "FederationProtocol.proto"; + +service RouterAdminProtocolService { + /** + * Add a mount table entry. + */ + rpc addMountTableEntry(AddMountTableEntryRequestProto) returns(AddMountTableEntryResponseProto); + + /** + * Update an existing mount table entry without copying files. + */ + rpc updateMountTableEntry(UpdateMountTableEntryRequestProto) returns(UpdateMountTableEntryResponseProto); + + /** + * Remove a mount table entry. + */ + rpc removeMountTableEntry(RemoveMountTableEntryRequestProto) returns(RemoveMountTableEntryResponseProto); + + /** + * Get matching mount entries + */ + rpc getMountTableEntries(GetMountTableEntriesRequestProto) returns(GetMountTableEntriesResponseProto); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/96ce1278/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 0c1fc32..3b6d947 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4638,6 +4638,44 @@ + + dfs.federation.router.admin.enable + true + + If the RPC admin service to handle client requests in the router is + enabled. + + + + + dfs.federation.router.admin-address + 0.0.0.0:8111 + + RPC address that handles the admin requests. + The value of this property will take the form of router-host1:rpc-port. + + + + + dfs.federation.router.admin-bind-host + + + The actual address the RPC admin server will bind to. If this optional + address is set, it overrides only the hostname portion of + dfs.federation.router.admin-address. This is useful for making the name + node listen on all interfaces by setting it to 0.0.0.0. + + + + + dfs.federation.router.admin.handler.count + 1 + + The number of server threads for the router to handle RPC requests from + admin. + + + dfs.federation.router.file.resolver.client.class org.apache.hadoop.hdfs.server.federation.MockResolver @@ -4655,6 +4693,14 @@ + dfs.federation.router.store.enable + true + + If the Router connects to the State Store. + + + + dfs.federation.router.store.serializer org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl http://git-wip-us.apache.org/repos/asf/hadoop/blob/96ce1278/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java index 21555c5..cac5e6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java @@ -28,8 +28,10 @@ public class RouterConfigBuilder { private Configuration conf; private boolean enableRpcServer = false; + private boolean enableAdminServer = false; private boolean enableHeartbeat = false; private boolean enableLocalHeartbeat = false; + private boolean enableStateStore = false; public RouterConfigBuilder(Configuration configuration) { this.conf = configuration; @@ -41,8 +43,10 @@ public class RouterConfigBuilder { public RouterConfigBuilder all() { this.enableRpcServer = true; + this.enableAdminServer = true; this.enableHeartbeat = true; this.enableLocalHeartbeat = true; + this.enableStateStore = true; return this; } @@ -56,21 +60,43 @@ public class RouterConfigBuilder { return this; } + public RouterConfigBuilder admin(boolean enable) { + this.enableAdminServer = enable; + return this; + } + public RouterConfigBuilder heartbeat(boolean enable) { this.enableHeartbeat = enable; return this; } + public RouterConfigBuilder stateStore(boolean enable) { + this.enableStateStore = enable; + return this; + } + public RouterConfigBuilder rpc() { return this.rpc(true); } + public RouterConfigBuilder admin() { + return this.admin(true); + } + public RouterConfigBuilder heartbeat() { return this.heartbeat(true); } + public RouterConfigBuilder stateStore() { + return this.stateStore(true); + } + public Configuration build() { + conf.setBoolean(DFSConfigKeys.DFS_ROUTER_STORE_ENABLE, + this.enableStateStore); conf.setBoolean(DFSConfigKeys.DFS_ROUTER_RPC_ENABLE, this.enableRpcServer); + conf.setBoolean(DFSConfigKeys.DFS_ROUTER_ADMIN_ENABLE, + this.enableAdminServer); conf.setBoolean(DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE, this.enableHeartbeat); conf.setBoolean(DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE, http://git-wip-us.apache.org/repos/asf/hadoop/blob/96ce1278/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java index 0830c19..1ee49d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java @@ -25,8 +25,12 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS; @@ -46,6 +50,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map.Entry; import java.util.Random; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; @@ -67,6 +72,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServi import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterClient; import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; @@ -101,6 +107,15 @@ public class RouterDFSCluster { /** Mini cluster. */ private MiniDFSCluster cluster; + protected static final long DEFAULT_HEARTBEAT_INTERVAL_MS = + TimeUnit.SECONDS.toMillis(5); + protected static final long DEFAULT_CACHE_INTERVAL_MS = + TimeUnit.SECONDS.toMillis(5); + /** Heartbeat interval in milliseconds. */ + private long heartbeatInterval; + /** Cache flush interval in milliseconds. */ + private long cacheFlushInterval; + /** Router configuration overrides. */ private Configuration routerOverrides; /** Namenode configuration overrides. */ @@ -118,6 +133,7 @@ public class RouterDFSCluster { private int rpcPort; private DFSClient client; private Configuration conf; + private RouterClient adminClient; private URI fileSystemUri; public RouterContext(Configuration conf, String nsId, String nnId) @@ -183,6 +199,15 @@ public class RouterDFSCluster { }); } + public RouterClient getAdminClient() throws IOException { + if (adminClient == null) { + InetSocketAddress routerSocket = router.getAdminServerAddress(); + LOG.info("Connecting to router admin at {}", routerSocket); + adminClient = new RouterClient(routerSocket, conf); + } + return adminClient; + } + public DFSClient getClient() throws IOException, URISyntaxException { if (client == null) { LOG.info("Connecting to router at {}", fileSystemUri); @@ -304,13 +329,22 @@ public class RouterDFSCluster { } } - public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes) { + public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes, + long heartbeatInterval, long cacheFlushInterval) { this.highAvailability = ha; + this.heartbeatInterval = heartbeatInterval; + this.cacheFlushInterval = cacheFlushInterval; configureNameservices(numNameservices, numNamenodes); } public RouterDFSCluster(boolean ha, int numNameservices) { - this(ha, numNameservices, 2); + this(ha, numNameservices, 2, + DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS); + } + + public RouterDFSCluster(boolean ha, int numNameservices, int numNamnodes) { + this(ha, numNameservices, numNamnodes, + DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS); } /** @@ -404,7 +438,12 @@ public class RouterDFSCluster { conf.set(DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0"); conf.set(DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0"); + conf.set(DFS_ROUTER_ADMIN_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(DFS_ROUTER_ADMIN_BIND_HOST_KEY, "0.0.0.0"); + conf.set(DFS_ROUTER_DEFAULT_NAMESERVICE, nameservices.get(0)); + conf.setLong(DFS_ROUTER_HEARTBEAT_INTERVAL_MS, heartbeatInterval); + conf.setLong(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, cacheFlushInterval); // Use mock resolver classes conf.setClass(FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, http://git-wip-us.apache.org/repos/asf/hadoop/blob/96ce1278/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java new file mode 100644 index 0000000..e42ab50 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation; + +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockRegistrationForNamenode; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; + +/** + * Test utility to mimic a federated HDFS cluster with a router and a state + * store. + */ +public class StateStoreDFSCluster extends RouterDFSCluster { + + private static final Class DEFAULT_FILE_RESOLVER = + MountTableResolver.class; + private static final Class DEFAULT_NAMENODE_RESOLVER = + MembershipNamenodeResolver.class; + + public StateStoreDFSCluster(boolean ha, int numNameservices, int numNamenodes, + long heartbeatInterval, long cacheFlushInterval) + throws IOException, InterruptedException { + this(ha, numNameservices, numNamenodes, heartbeatInterval, + cacheFlushInterval, DEFAULT_FILE_RESOLVER); + } + + public StateStoreDFSCluster(boolean ha, int numNameservices, int numNamenodes, + long heartbeatInterval, long cacheFlushInterval, Class fileResolver) + throws IOException, InterruptedException { + super(ha, numNameservices, numNamenodes, heartbeatInterval, + cacheFlushInterval); + + // Attach state store and resolvers to router + Configuration stateStoreConfig = getStateStoreConfiguration(); + // Use state store backed resolvers + stateStoreConfig.setClass( + DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, + DEFAULT_NAMENODE_RESOLVER, ActiveNamenodeResolver.class); + stateStoreConfig.setClass( + DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, + fileResolver, FileSubclusterResolver.class); + this.addRouterOverrides(stateStoreConfig); + } + + public StateStoreDFSCluster(boolean ha, int numNameservices, + Class fileResolver) throws IOException, InterruptedException { + this(ha, numNameservices, 2, + DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS, fileResolver); + } + + public StateStoreDFSCluster(boolean ha, int numNameservices) + throws IOException, InterruptedException { + this(ha, numNameservices, 2, + DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS); + } + + public StateStoreDFSCluster(boolean ha, int numNameservices, + int numNamnodes) throws IOException, InterruptedException { + this(ha, numNameservices, numNamnodes, + DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS); + } + + ///////////////////////////////////////////////////////////////////////////// + // State Store Test Fixtures + ///////////////////////////////////////////////////////////////////////////// + + /** + * Adds test fixtures for NN registation for each NN nameservice -> NS + * namenode -> NN rpcAddress -> 0.0.0.0:0 webAddress -> 0.0.0.0:0 state -> + * STANDBY safeMode -> false blockPool -> test. + * + * @param stateStore State Store. + * @throws IOException If it cannot register. + */ + public void createTestRegistration(StateStoreService stateStore) + throws IOException { + List entries = new ArrayList(); + for (NamenodeContext nn : this.getNamenodes()) { + MembershipState entry = createMockRegistrationForNamenode( + nn.getNameserviceId(), nn.getNamenodeId(), + FederationNamenodeServiceState.STANDBY); + entries.add(entry); + } + synchronizeRecords( + stateStore, entries, MembershipState.class); + } + + public void createTestMountTable(StateStoreService stateStore) + throws IOException { + List mounts = generateMockMountTable(); + synchronizeRecords(stateStore, mounts, MountTable.class); + stateStore.refreshCaches(); + } + + public List generateMockMountTable() throws IOException { + // create table entries + List entries = new ArrayList<>(); + for (String ns : this.getNameservices()) { + Map destMap = new HashMap<>(); + destMap.put(ns, getNamenodePathForNS(ns)); + + // Direct path + String fedPath = getFederatedPathForNS(ns); + MountTable entry = MountTable.newInstance(fedPath, destMap); + entries.add(entry); + } + + // Root path goes to nameservice 1 + Map destMap = new HashMap<>(); + String ns0 = this.getNameservices().get(0); + destMap.put(ns0, "/"); + MountTable entry = MountTable.newInstance("/", destMap); + entries.add(entry); + return entries; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org