hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hanishakon...@apache.org
Subject [38/50] [abbrv] hadoop git commit: HDFS-13215. RBF: Move Router to its own module. Contributed by Wei Yan
Date Wed, 21 Mar 2018 23:48:14 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java
new file mode 100644
index 0000000..b36e459
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Client to connect to the {@link Router} via the admin protocol.
+ */
+@Private
+public class RouterClient implements Closeable {
+
+  private final RouterAdminProtocolTranslatorPB proxy;
+  private final UserGroupInformation ugi;
+
+  private static RouterAdminProtocolTranslatorPB createRouterProxy(
+      InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
+          throws IOException {
+
+    RPC.setProtocolEngine(
+        conf, RouterAdminProtocolPB.class, ProtobufRpcEngine.class);
+
+    AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
+    final long version = RPC.getProtocolVersion(RouterAdminProtocolPB.class);
+    RouterAdminProtocolPB proxy = RPC.getProtocolProxy(
+        RouterAdminProtocolPB.class, version, address, ugi, conf,
+        NetUtils.getDefaultSocketFactory(conf),
+        RPC.getRpcTimeout(conf), null,
+        fallbackToSimpleAuth).getProxy();
+
+    return new RouterAdminProtocolTranslatorPB(proxy);
+  }
+
+  public RouterClient(InetSocketAddress address, Configuration conf)
+      throws IOException {
+    this.ugi = UserGroupInformation.getCurrentUser();
+    this.proxy = createRouterProxy(address, conf, ugi);
+  }
+
+  public MountTableManager getMountTableManager() {
+    return proxy;
+  }
+
+  public RouterStateManager getRouterStateManager() {
+    return proxy;
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    RPC.stopProxy(proxy);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
new file mode 100644
index 0000000..fe172c2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.store.CachedRecordStore;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
+import org.apache.hadoop.hdfs.server.federation.store.RecordStore;
+import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
+import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to periodically update the Router current state in the State Store.
+ */
+public class RouterHeartbeatService extends PeriodicService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterHeartbeatService.class);
+
+  /** Router we are hearbeating. */
+  private final Router router;
+
+  /**
+   * Create a new Router heartbeat service.
+   *
+   * @param router Router to heartbeat.
+   */
+  public RouterHeartbeatService(Router router) {
+    super(RouterHeartbeatService.class.getSimpleName());
+    this.router = router;
+  }
+
+  /**
+   * Trigger the update of the Router state asynchronously.
+   */
+  protected void updateStateAsync() {
+    Thread thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        updateStateStore();
+      }
+    }, "Router Heartbeat Async");
+    thread.setDaemon(true);
+    thread.start();
+  }
+
+  /**
+   * Update the state of the Router in the State Store.
+   */
+  @VisibleForTesting
+  synchronized void updateStateStore() {
+    String routerId = router.getRouterId();
+    if (routerId == null) {
+      LOG.error("Cannot heartbeat for router: unknown router id");
+      return;
+    }
+    if (isStoreAvailable()) {
+      RouterStore routerStore = router.getRouterStateManager();
+      try {
+        RouterState record = RouterState.newInstance(
+            routerId, router.getStartTime(), router.getRouterState());
+        StateStoreVersion stateStoreVersion = StateStoreVersion.newInstance(
+            getStateStoreVersion(MembershipStore.class),
+            getStateStoreVersion(MountTableStore.class));
+        record.setStateStoreVersion(stateStoreVersion);
+        RouterHeartbeatRequest request =
+            RouterHeartbeatRequest.newInstance(record);
+        RouterHeartbeatResponse response = routerStore.routerHeartbeat(request);
+        if (!response.getStatus()) {
+          LOG.warn("Cannot heartbeat router {}", routerId);
+        } else {
+          LOG.debug("Router heartbeat for router {}", routerId);
+        }
+      } catch (IOException e) {
+        LOG.error("Cannot heartbeat router {}: {}", routerId, e.getMessage());
+      }
+    } else {
+      LOG.warn("Cannot heartbeat router {}: State Store unavailable", routerId);
+    }
+  }
+
+  /**
+   * Get the version of the data in the State Store.
+   *
+   * @param clazz Class in the State Store.
+   * @return Version of the data.
+   */
+  private <R extends BaseRecord, S extends RecordStore<R>>
+      long getStateStoreVersion(final Class<S> clazz) {
+    long version = -1;
+    try {
+      StateStoreService stateStore = router.getStateStore();
+      S recordStore = stateStore.getRegisteredRecordStore(clazz);
+      if (recordStore != null) {
+        if (recordStore instanceof CachedRecordStore) {
+          CachedRecordStore<R> cachedRecordStore =
+              (CachedRecordStore<R>) recordStore;
+          List<R> records = cachedRecordStore.getCachedRecords();
+          for (BaseRecord record : records) {
+            if (record.getDateModified() > version) {
+              version = record.getDateModified();
+            }
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Cannot get version for {}: {}", clazz, e.getMessage());
+    }
+    return version;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+
+    long interval = conf.getTimeDuration(
+        RBFConfigKeys.DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS,
+        RBFConfigKeys.DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    this.setIntervalMs(interval);
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void periodicInvoke() {
+    updateStateStore();
+  }
+
+  private boolean isStoreAvailable() {
+    if (router.getRouterStateManager() == null) {
+      return false;
+    }
+    if (router.getStateStore() == null) {
+      return false;
+    }
+    return router.getStateStore().isDriverReady();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHttpServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHttpServer.java
new file mode 100644
index 0000000..93a9cff
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHttpServer.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.service.AbstractService;
+
+/**
+ * Web interface for the {@link Router}. It exposes the Web UI and the WebHDFS
+ * methods from {@link RouterWebHdfsMethods}.
+ */
+public class RouterHttpServer extends AbstractService {
+
+  protected static final String NAMENODE_ATTRIBUTE_KEY = "name.node";
+
+
+  /** Configuration for the Router HTTP server. */
+  private Configuration conf;
+
+  /** Router using this HTTP server. */
+  private final Router router;
+
+  /** HTTP server. */
+  private HttpServer2 httpServer;
+
+  /** HTTP addresses. */
+  private InetSocketAddress httpAddress;
+  private InetSocketAddress httpsAddress;
+
+
+  public RouterHttpServer(Router router) {
+    super(RouterHttpServer.class.getName());
+    this.router = router;
+  }
+
+  @Override
+  protected void serviceInit(Configuration configuration) throws Exception {
+    this.conf = configuration;
+
+    // Get HTTP address
+    this.httpAddress = conf.getSocketAddr(
+        RBFConfigKeys.DFS_ROUTER_HTTP_BIND_HOST_KEY,
+        RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY,
+        RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_DEFAULT,
+        RBFConfigKeys.DFS_ROUTER_HTTP_PORT_DEFAULT);
+
+    // Get HTTPs address
+    this.httpsAddress = conf.getSocketAddr(
+        RBFConfigKeys.DFS_ROUTER_HTTPS_BIND_HOST_KEY,
+        RBFConfigKeys.DFS_ROUTER_HTTPS_ADDRESS_KEY,
+        RBFConfigKeys.DFS_ROUTER_HTTPS_ADDRESS_DEFAULT,
+        RBFConfigKeys.DFS_ROUTER_HTTPS_PORT_DEFAULT);
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    // Build and start server
+    String webApp = "router";
+    HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(
+        this.conf, this.httpAddress, this.httpsAddress, webApp,
+        DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
+        DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY);
+
+    this.httpServer = builder.build();
+
+    this.httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, this.router);
+    this.httpServer.setAttribute(JspHelper.CURRENT_CONF, this.conf);
+    setupServlets(this.httpServer, this.conf);
+
+    this.httpServer.start();
+
+    // The server port can be ephemeral... ensure we have the correct info
+    InetSocketAddress listenAddress = this.httpServer.getConnectorAddress(0);
+    if (listenAddress != null) {
+      this.httpAddress = new InetSocketAddress(this.httpAddress.getHostName(),
+          listenAddress.getPort());
+    }
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if(this.httpServer != null) {
+      this.httpServer.stop();
+    }
+    super.serviceStop();
+  }
+
+  private static void setupServlets(
+      HttpServer2 httpServer, Configuration conf) {
+    // TODO Add servlets for FSCK, etc
+  }
+
+  public InetSocketAddress getHttpAddress() {
+    return this.httpAddress;
+  }
+
+  public InetSocketAddress getHttpsAddress() {
+    return this.httpsAddress;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java
new file mode 100644
index 0000000..851538a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
+import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+
+/**
+ * This class is for maintaining the various Router activity statistics
+ * and publishing them through the metrics interfaces.
+ */
+@Metrics(name="RouterActivity", about="Router metrics", context="dfs")
+public class RouterMetrics {
+
+  private final MetricsRegistry registry = new MetricsRegistry("router");
+
+  @Metric("Duration in SafeMode at startup in msec")
+  private MutableGaugeInt safeModeTime;
+
+  private JvmMetrics jvmMetrics = null;
+
+  RouterMetrics(
+      String processName, String sessionId, final JvmMetrics jvmMetrics) {
+    this.jvmMetrics = jvmMetrics;
+    registry.tag(ProcessName, processName).tag(SessionId, sessionId);
+  }
+
+  public static RouterMetrics create(Configuration conf) {
+    String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
+    String processName = "Router";
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    JvmMetrics jm = JvmMetrics.create(processName, sessionId, ms);
+
+    return ms.register(new RouterMetrics(processName, sessionId, jm));
+  }
+
+  public JvmMetrics getJvmMetrics() {
+    return jvmMetrics;
+  }
+
+  public void shutdown() {
+    DefaultMetricsSystem.shutdown();
+  }
+
+  public void setSafeModeTime(long elapsed) {
+    safeModeTime.set((int) elapsed);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java
new file mode 100644
index 0000000..f4debce
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
+import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.service.AbstractService;
+
+/**
+ * Service to manage the metrics of the Router.
+ */
+public class RouterMetricsService extends AbstractService {
+
+  /** Router for this metrics. */
+  private final Router router;
+
+  /** Router metrics. */
+  private RouterMetrics routerMetrics;
+  /** Federation metrics. */
+  private FederationMetrics federationMetrics;
+  /** Namenode mock metrics. */
+  private NamenodeBeanMetrics nnMetrics;
+
+
+  public RouterMetricsService(final Router router) {
+    super(RouterMetricsService.class.getName());
+    this.router = router;
+  }
+
+  @Override
+  protected void serviceInit(Configuration configuration) throws Exception {
+    this.routerMetrics = RouterMetrics.create(configuration);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    // Wrapper for all the FSNamesystem JMX interfaces
+    this.nnMetrics = new NamenodeBeanMetrics(this.router);
+
+    // Federation MBean JMX interface
+    this.federationMetrics = new FederationMetrics(this.router);
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    // Remove JMX interfaces
+    if (this.federationMetrics != null) {
+      this.federationMetrics.close();
+    }
+
+    // Remove Namenode JMX interfaces
+    if (this.nnMetrics != null) {
+      this.nnMetrics.close();
+    }
+
+    // Shutdown metrics
+    if (this.routerMetrics != null) {
+      this.routerMetrics.shutdown();
+    }
+  }
+
+  /**
+   * Get the metrics system for the Router.
+   *
+   * @return Router metrics.
+   */
+  public RouterMetrics getRouterMetrics() {
+    return this.routerMetrics;
+  }
+
+  /**
+   * Get the federation metrics.
+   *
+   * @return Federation metrics.
+   */
+  public FederationMetrics getFederationMetrics() {
+    return this.federationMetrics;
+  }
+
+  /**
+   * Get the JVM metrics for the Router.
+   *
+   * @return JVM metrics.
+   */
+  public JvmMetrics getJvmMetrics() {
+    if (this.routerMetrics == null) {
+      return null;
+    }
+    return this.routerMetrics.getJvmMetrics();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java
new file mode 100644
index 0000000..9d81dce
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Class that helps in checking permissions in Router-based federation.
+ */
+public class RouterPermissionChecker extends FSPermissionChecker {
+  static final Log LOG = LogFactory.getLog(RouterPermissionChecker.class);
+
+  /** Mount table default permission. */
+  public static final short MOUNT_TABLE_PERMISSION_DEFAULT = 00755;
+
+  public RouterPermissionChecker(String routerOwner, String supergroup,
+      UserGroupInformation callerUgi) {
+    super(routerOwner, supergroup, callerUgi, null);
+  }
+
+  /**
+   * Whether a mount table entry can be accessed by the current context.
+   *
+   * @param mountTable
+   *          MountTable being accessed
+   * @param access
+   *          type of action being performed on the cache pool
+   * @throws AccessControlException
+   *           if mount table cannot be accessed
+   */
+  public void checkPermission(MountTable mountTable, FsAction access)
+      throws AccessControlException {
+    if (isSuperUser()) {
+      return;
+    }
+
+    FsPermission mode = mountTable.getMode();
+    if (getUser().equals(mountTable.getOwnerName())
+        && mode.getUserAction().implies(access)) {
+      return;
+    }
+
+    if (isMemberOfGroup(mountTable.getGroupName())
+        && mode.getGroupAction().implies(access)) {
+      return;
+    }
+
+    if (!getUser().equals(mountTable.getOwnerName())
+        && !isMemberOfGroup(mountTable.getGroupName())
+        && mode.getOtherAction().implies(access)) {
+      return;
+    }
+
+    throw new AccessControlException(
+        "Permission denied while accessing mount table "
+            + mountTable.getSourcePath()
+            + ": user " + getUser() + " does not have " + access.toString()
+            + " permissions.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java
new file mode 100644
index 0000000..0df34fc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.isParentEntry;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+
+/**
+ * Router quota manager in Router. The manager maintains
+ * {@link RouterQuotaUsage} cache of mount tables and do management
+ * for the quota caches.
+ */
+public class RouterQuotaManager {
+  /** Quota usage <MountTable Path, Aggregated QuotaUsage> cache. */
+  private TreeMap<String, RouterQuotaUsage> cache;
+
+  /** Lock to access the quota cache. */
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
+  public RouterQuotaManager() {
+    this.cache = new TreeMap<>();
+  }
+
+  /**
+   * Get all the mount quota paths.
+   */
+  public Set<String> getAll() {
+    readLock.lock();
+    try {
+      return this.cache.keySet();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Get the nearest ancestor's quota usage, and meanwhile its quota was set.
+   * @param path The path being written.
+   * @return RouterQuotaUsage Quota usage.
+   */
+  public RouterQuotaUsage getQuotaUsage(String path) {
+    readLock.lock();
+    try {
+      RouterQuotaUsage quotaUsage = this.cache.get(path);
+      if (quotaUsage != null && isQuotaSet(quotaUsage)) {
+        return quotaUsage;
+      }
+
+      // If not found, look for its parent path usage value.
+      int pos = path.lastIndexOf(Path.SEPARATOR);
+      if (pos != -1) {
+        String parentPath = path.substring(0, pos);
+        return getQuotaUsage(parentPath);
+      }
+    } finally {
+      readLock.unlock();
+    }
+
+    return null;
+  }
+
+  /**
+   * Get children paths (can including itself) under specified federation path.
+   * @param parentPath
+   * @return Set<String> Children path set.
+   */
+  public Set<String> getPaths(String parentPath) {
+    readLock.lock();
+    try {
+      String from = parentPath;
+      String to = parentPath + Character.MAX_VALUE;
+      SortedMap<String, RouterQuotaUsage> subMap = this.cache.subMap(from, to);
+
+      Set<String> validPaths = new HashSet<>();
+      if (subMap != null) {
+        for (String path : subMap.keySet()) {
+          if (isParentEntry(path, parentPath)) {
+            validPaths.add(path);
+          }
+        }
+      }
+      return validPaths;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Put new entity into cache.
+   * @param path Mount table path.
+   * @param quotaUsage Corresponding cache value.
+   */
+  public void put(String path, RouterQuotaUsage quotaUsage) {
+    writeLock.lock();
+    try {
+      this.cache.put(path, quotaUsage);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Remove the entity from cache.
+   * @param path Mount table path.
+   */
+  public void remove(String path) {
+    writeLock.lock();
+    try {
+      this.cache.remove(path);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Clean up the cache.
+   */
+  public void clear() {
+    writeLock.lock();
+    try {
+      this.cache.clear();
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Check if the quota was set.
+   * @param quota RouterQuotaUsage set in mount table.
+   */
+  public boolean isQuotaSet(RouterQuotaUsage quota) {
+    if (quota != null) {
+      long nsQuota = quota.getQuota();
+      long ssQuota = quota.getSpaceQuota();
+
+      // once nsQuota or ssQuota was set, this mount table is quota set
+      if (nsQuota != HdfsConstants.QUOTA_DONT_SET
+          || ssQuota != HdfsConstants.QUOTA_DONT_SET) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
new file mode 100644
index 0000000..9fc93c1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.QuotaUsage;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to periodically update the {@link RouterQuotaUsage}
+ * cached information in the {@link Router} and update corresponding
+ * mount table in State Store.
+ */
+public class RouterQuotaUpdateService extends PeriodicService {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterQuotaUpdateService.class);
+
+  private MountTableStore mountTableStore;
+  private RouterRpcServer rpcServer;
+  /** Router using this Service. */
+  private final Router router;
+  /** Router Quota manager. */
+  private RouterQuotaManager quotaManager;
+
+  public RouterQuotaUpdateService(final Router router) throws IOException {
+    super(RouterQuotaUpdateService.class.getName());
+    this.router = router;
+    this.rpcServer = router.getRpcServer();
+    this.quotaManager = router.getQuotaManager();
+
+    if (this.quotaManager == null) {
+      throw new IOException("Router quota manager is not initialized.");
+    }
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.setIntervalMs(conf.getTimeDuration(
+        RBFConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL,
+        RBFConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS));
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void periodicInvoke() {
+    LOG.debug("Start to update quota cache.");
+    try {
+      List<MountTable> updateMountTables = new LinkedList<>();
+      List<MountTable> mountTables = getQuotaSetMountTables();
+      for (MountTable entry : mountTables) {
+        String src = entry.getSourcePath();
+        RouterQuotaUsage oldQuota = entry.getQuota();
+        long nsQuota = oldQuota.getQuota();
+        long ssQuota = oldQuota.getSpaceQuota();
+        // Call RouterRpcServer#getQuotaUsage for getting current quota usage.
+        QuotaUsage currentQuotaUsage = this.rpcServer.getQuotaModule()
+            .getQuotaUsage(src);
+        // If quota is not set in some subclusters under federation path,
+        // set quota for this path.
+        if (currentQuotaUsage.getQuota() == HdfsConstants.QUOTA_DONT_SET) {
+          this.rpcServer.setQuota(src, nsQuota, ssQuota, null);
+        }
+
+        RouterQuotaUsage newQuota = generateNewQuota(oldQuota,
+            currentQuotaUsage);
+        this.quotaManager.put(src, newQuota);
+        entry.setQuota(newQuota);
+
+        // only update mount tables which quota was changed
+        if (!oldQuota.equals(newQuota)) {
+          updateMountTables.add(entry);
+
+          LOG.debug(
+              "Update quota usage entity of path: {}, nsCount: {},"
+                  + " nsQuota: {}, ssCount: {}, ssQuota: {}.",
+              src, newQuota.getFileAndDirectoryCount(),
+              newQuota.getQuota(), newQuota.getSpaceConsumed(),
+              newQuota.getSpaceQuota());
+        }
+      }
+
+      updateMountTableEntries(updateMountTables);
+    } catch (IOException e) {
+      LOG.error("Quota cache updated error.", e);
+    }
+  }
+
+  /**
+   * Get mount table store management interface.
+   * @return MountTableStore instance.
+   * @throws IOException
+   */
+  private MountTableStore getMountTableStore() throws IOException {
+    if (this.mountTableStore == null) {
+      this.mountTableStore = router.getStateStore().getRegisteredRecordStore(
+          MountTableStore.class);
+      if (this.mountTableStore == null) {
+        throw new IOException("Mount table state store is not available.");
+      }
+    }
+    return this.mountTableStore;
+  }
+
+  /**
+   * Get all the existing mount tables.
+   * @return List of mount tables.
+   * @throws IOException
+   */
+  private List<MountTable> getMountTableEntries() throws IOException {
+    // scan mount tables from root path
+    GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
+        .newInstance("/");
+    GetMountTableEntriesResponse getResponse = getMountTableStore()
+        .getMountTableEntries(getRequest);
+    return getResponse.getEntries();
+  }
+
+  /**
+   * Get mount tables which quota was set.
+   * During this time, the quota usage cache will also be updated by
+   * quota manager:
+   * 1. Stale paths (entries) will be removed.
+   * 2. Existing entries will be override and updated.
+   * @return List of mount tables which quota was set.
+   * @throws IOException
+   */
+  private List<MountTable> getQuotaSetMountTables() throws IOException {
+    List<MountTable> mountTables = getMountTableEntries();
+    Set<String> stalePaths = new HashSet<>();
+    for (String path : this.quotaManager.getAll()) {
+      stalePaths.add(path);
+    }
+
+    List<MountTable> neededMountTables = new LinkedList<>();
+    for (MountTable entry : mountTables) {
+      // select mount tables which is quota set
+      if (isQuotaSet(entry)) {
+        neededMountTables.add(entry);
+      }
+
+      // update mount table entries info in quota cache
+      String src = entry.getSourcePath();
+      this.quotaManager.put(src, entry.getQuota());
+      stalePaths.remove(src);
+    }
+
+    // remove stale paths that currently cached
+    for (String stalePath : stalePaths) {
+      this.quotaManager.remove(stalePath);
+    }
+
+    return neededMountTables;
+  }
+
+  /**
+   * Check if the quota was set in given MountTable.
+   * @param mountTable Mount table entry.
+   */
+  private boolean isQuotaSet(MountTable mountTable) {
+    if (mountTable != null) {
+      return this.quotaManager.isQuotaSet(mountTable.getQuota());
+    }
+    return false;
+  }
+
+  /**
+   * Generate a new quota based on old quota and current quota usage value.
+   * @param oldQuota Old quota stored in State Store.
+   * @param currentQuotaUsage Current quota usage value queried from
+   *        subcluster.
+   * @return A new RouterQuotaUsage.
+   */
+  private RouterQuotaUsage generateNewQuota(RouterQuotaUsage oldQuota,
+      QuotaUsage currentQuotaUsage) {
+    RouterQuotaUsage newQuota = new RouterQuotaUsage.Builder()
+        .fileAndDirectoryCount(currentQuotaUsage.getFileAndDirectoryCount())
+        .quota(oldQuota.getQuota())
+        .spaceConsumed(currentQuotaUsage.getSpaceConsumed())
+        .spaceQuota(oldQuota.getSpaceQuota()).build();
+    return newQuota;
+  }
+
+  /**
+   * Write out updated mount table entries into State Store.
+   * @param updateMountTables Mount tables to be updated.
+   * @throws IOException
+   */
+  private void updateMountTableEntries(List<MountTable> updateMountTables)
+      throws IOException {
+    for (MountTable entry : updateMountTables) {
+      UpdateMountTableEntryRequest updateRequest = UpdateMountTableEntryRequest
+          .newInstance(entry);
+      getMountTableStore().updateMountTableEntry(updateRequest);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java
new file mode 100644
index 0000000..eedd80f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.fs.QuotaUsage;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature;
+import org.apache.hadoop.hdfs.server.namenode.Quota;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * The subclass of {@link QuotaUsage} used in Router-based federation.
+ */
+public final class RouterQuotaUsage extends QuotaUsage {
+
+  /** Default quota usage count. */
+  public static final long QUOTA_USAGE_COUNT_DEFAULT = 0;
+
+  private RouterQuotaUsage(Builder builder) {
+    super(builder);
+  }
+
+  /** Build the instance based on the builder. */
+  public static class Builder extends QuotaUsage.Builder {
+
+    public RouterQuotaUsage build() {
+      return new RouterQuotaUsage(this);
+    }
+
+    @Override
+    public Builder fileAndDirectoryCount(long count) {
+      super.fileAndDirectoryCount(count);
+      return this;
+    }
+
+    @Override
+    public Builder quota(long quota) {
+      super.quota(quota);
+      return this;
+    }
+
+    @Override
+    public Builder spaceConsumed(long spaceConsumed) {
+      super.spaceConsumed(spaceConsumed);
+      return this;
+    }
+
+    @Override
+    public Builder spaceQuota(long spaceQuota) {
+      super.spaceQuota(spaceQuota);
+      return this;
+    }
+  }
+
+  /**
+   * Verify if namespace quota is violated once quota is set. Relevant
+   * method {@link DirectoryWithQuotaFeature#verifyNamespaceQuota}.
+   * @throws NSQuotaExceededException
+   */
+  public void verifyNamespaceQuota() throws NSQuotaExceededException {
+    if (Quota.isViolated(getQuota(), getFileAndDirectoryCount())) {
+      throw new NSQuotaExceededException(getQuota(),
+          getFileAndDirectoryCount());
+    }
+  }
+
+  /**
+   * Verify if storage space quota is violated once quota is set. Relevant
+   * method {@link DirectoryWithQuotaFeature#verifyStoragespaceQuota}.
+   * @throws DSQuotaExceededException
+   */
+  public void verifyStoragespaceQuota() throws DSQuotaExceededException {
+    if (Quota.isViolated(getSpaceQuota(), getSpaceConsumed())) {
+      throw new DSQuotaExceededException(getSpaceQuota(), getSpaceConsumed());
+    }
+  }
+
+  @Override
+  public String toString() {
+    String nsQuota = String.valueOf(getQuota());
+    String nsCount = String.valueOf(getFileAndDirectoryCount());
+    if (getQuota() == HdfsConstants.QUOTA_DONT_SET) {
+      nsQuota = "-";
+      nsCount = "-";
+    }
+
+    String ssQuota = StringUtils.byteDesc(getSpaceQuota());
+    String ssCount = StringUtils.byteDesc(getSpaceConsumed());
+    if (getSpaceQuota() == HdfsConstants.QUOTA_DONT_SET) {
+      ssQuota = "-";
+      ssCount = "-";
+    }
+
+    StringBuilder str = new StringBuilder();
+    str.append("[NsQuota: ").append(nsQuota).append("/")
+        .append(nsCount);
+    str.append(", SsQuota: ").append(ssQuota)
+        .append("/").append(ssCount)
+        .append("]");
+    return str.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
new file mode 100644
index 0000000..0d298ac
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -0,0 +1,1021 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A client proxy for Router -> NN communication using the NN ClientProtocol.
+ * <p>
+ * Provides routers to invoke remote ClientProtocol methods and handle
+ * retries/failover.
+ * <ul>
+ * <li>invokeSingle Make a single request to a single namespace
+ * <li>invokeSequential Make a sequential series of requests to multiple
+ * ordered namespaces until a condition is met.
+ * <li>invokeConcurrent Make concurrent requests to multiple namespaces and
+ * return all of the results.
+ * </ul>
+ * Also maintains a cached pool of connections to NNs. Connections are managed
+ * by the ConnectionManager and are unique to each user + NN. The size of the
+ * connection pool can be configured. Larger pools allow for more simultaneous
+ * requests to a single NN from a single user.
+ */
+public class RouterRpcClient {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterRpcClient.class);
+
+
+  /** Router identifier. */
+  private final String routerId;
+
+  /** Interface to identify the active NN for a nameservice or blockpool ID. */
+  private final ActiveNamenodeResolver namenodeResolver;
+
+  /** Connection pool to the Namenodes per user for performance. */
+  private final ConnectionManager connectionManager;
+  /** Service to run asynchronous calls. */
+  private final ExecutorService executorService;
+  /** Retry policy for router -> NN communication. */
+  private final RetryPolicy retryPolicy;
+  /** Optional perf monitor. */
+  private final RouterRpcMonitor rpcMonitor;
+
+  /** Pattern to parse a stack trace line. */
+  private static final Pattern STACK_TRACE_PATTERN =
+      Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");
+
+
+  /**
+   * Create a router RPC client to manage remote procedure calls to NNs.
+   *
+   * @param conf Hdfs Configuation.
+   * @param resolver A NN resolver to determine the currently active NN in HA.
+   * @param monitor Optional performance monitor.
+   */
+  public RouterRpcClient(Configuration conf, String identifier,
+      ActiveNamenodeResolver resolver, RouterRpcMonitor monitor) {
+    this.routerId = identifier;
+
+    this.namenodeResolver = resolver;
+
+    this.connectionManager = new ConnectionManager(conf);
+    this.connectionManager.start();
+
+    int numThreads = conf.getInt(
+        RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
+        RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT);
+    ThreadFactory threadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("RPC Router Client-%d")
+        .build();
+    this.executorService = Executors.newFixedThreadPool(
+        numThreads, threadFactory);
+
+    this.rpcMonitor = monitor;
+
+    int maxFailoverAttempts = conf.getInt(
+        HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY,
+        HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT);
+    int maxRetryAttempts = conf.getInt(
+        RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS,
+        RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT);
+    int failoverSleepBaseMillis = conf.getInt(
+        HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY,
+        HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT);
+    int failoverSleepMaxMillis = conf.getInt(
+        HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY,
+        HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT);
+    this.retryPolicy = RetryPolicies.failoverOnNetworkException(
+        RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, maxRetryAttempts,
+        failoverSleepBaseMillis, failoverSleepMaxMillis);
+  }
+
+  /**
+   * Get the active namenode resolver used by this client.
+   * @return Active namenode resolver.
+   */
+  public ActiveNamenodeResolver getNamenodeResolver() {
+    return this.namenodeResolver;
+  }
+
+  /**
+   * Shutdown the client.
+   */
+  public void shutdown() {
+    if (this.connectionManager != null) {
+      this.connectionManager.close();
+    }
+    if (this.executorService != null) {
+      this.executorService.shutdownNow();
+    }
+  }
+
+  /**
+   * Total number of available sockets between the router and NNs.
+   *
+   * @return Number of namenode clients.
+   */
+  public int getNumConnections() {
+    return this.connectionManager.getNumConnections();
+  }
+
+  /**
+   * Total number of available sockets between the router and NNs.
+   *
+   * @return Number of namenode clients.
+   */
+  public int getNumActiveConnections() {
+    return this.connectionManager.getNumActiveConnections();
+  }
+
+  /**
+   * Total number of open connection pools to a NN. Each connection pool.
+   * represents one user + one NN.
+   *
+   * @return Number of connection pools.
+   */
+  public int getNumConnectionPools() {
+    return this.connectionManager.getNumConnectionPools();
+  }
+
+  /**
+   * Number of connections between the router and NNs being created sockets.
+   *
+   * @return Number of connections waiting to be created.
+   */
+  public int getNumCreatingConnections() {
+    return this.connectionManager.getNumCreatingConnections();
+  }
+
+  /**
+   * JSON representation of the connection pool.
+   *
+   * @return String representation of the JSON.
+   */
+  public String getJSON() {
+    return this.connectionManager.getJSON();
+  }
+
+  /**
+   * Get ClientProtocol proxy client for a NameNode. Each combination of user +
+   * NN must use a unique proxy client. Previously created clients are cached
+   * and stored in a connection pool by the ConnectionManager.
+   *
+   * @param ugi User group information.
+   * @param nsId Nameservice identifier.
+   * @param rpcAddress ClientProtocol RPC server address of the NN.
+   * @return ConnectionContext containing a ClientProtocol proxy client for the
+   *         NN + current user.
+   * @throws IOException If we cannot get a connection to the NameNode.
+   */
+  private ConnectionContext getConnection(
+      UserGroupInformation ugi, String nsId, String rpcAddress)
+          throws IOException {
+    ConnectionContext connection = null;
+    try {
+      // Each proxy holds the UGI info for the current user when it is created.
+      // This cache does not scale very well, one entry per user per namenode,
+      // and may need to be adjusted and/or selectively pruned. The cache is
+      // important due to the excessive overhead of creating a new proxy wrapper
+      // for each individual request.
+
+      // TODO Add tokens from the federated UGI
+      connection = this.connectionManager.getConnection(ugi, rpcAddress);
+      LOG.debug("User {} NN {} is using connection {}",
+          ugi.getUserName(), rpcAddress, connection);
+    } catch (Exception ex) {
+      LOG.error("Cannot open NN client to address: {}", rpcAddress, ex);
+    }
+
+    if (connection == null) {
+      throw new IOException("Cannot get a connection to " + rpcAddress);
+    }
+    return connection;
+  }
+
+  /**
+   * Convert an exception to an IOException.
+   *
+   * For a non-IOException, wrap it with IOException. For a RemoteException,
+   * unwrap it. For an IOException which is not a RemoteException, return it.
+   *
+   * @param e Exception to convert into an exception.
+   * @return Created IO exception.
+   */
+  private static IOException toIOException(Exception e) {
+    if (e instanceof RemoteException) {
+      return ((RemoteException) e).unwrapRemoteException();
+    }
+    if (e instanceof IOException) {
+      return (IOException)e;
+    }
+    return new IOException(e);
+  }
+
+  /**
+   * If we should retry the RPC call.
+   *
+   * @param ioe IOException reported.
+   * @param retryCount Number of retries.
+   * @param nsId Nameservice ID.
+   * @return Retry decision.
+   * @throws IOException Original exception if the retry policy generates one
+   *                     or IOException for no available namenodes.
+   */
+  private RetryDecision shouldRetry(final IOException ioe, final int retryCount,
+      final String nsId) throws IOException {
+    // check for the case of cluster unavailable state
+    if (isClusterUnAvailable(nsId)) {
+      // we allow to retry once if cluster is unavailable
+      if (retryCount == 0) {
+        return RetryDecision.RETRY;
+      } else {
+        throw new IOException("No namenode available under nameservice " + nsId,
+            ioe);
+      }
+    }
+
+    try {
+      final RetryPolicy.RetryAction a =
+          this.retryPolicy.shouldRetry(ioe, retryCount, 0, true);
+      return a.action;
+    } catch (Exception ex) {
+      LOG.error("Re-throwing API exception, no more retries", ex);
+      throw toIOException(ex);
+    }
+  }
+
+  /**
+   * Invokes a method against the ClientProtocol proxy server. If a standby
+   * exception is generated by the call to the client, retries using the
+   * alternate server.
+   *
+   * Re-throws exceptions generated by the remote RPC call as either
+   * RemoteException or IOException.
+   *
+   * @param ugi User group information.
+   * @param namenodes A prioritized list of namenodes within the same
+   *                  nameservice.
+   * @param method Remote ClientProtcol method to invoke.
+   * @param params Variable list of parameters matching the method.
+   * @return The result of invoking the method.
+   * @throws IOException
+   */
+  private Object invokeMethod(
+      final UserGroupInformation ugi,
+      final List<? extends FederationNamenodeContext> namenodes,
+      final Method method, final Object... params) throws IOException {
+
+    if (namenodes == null || namenodes.isEmpty()) {
+      throw new IOException("No namenodes to invoke " + method.getName() +
+          " with params " + Arrays.toString(params) + " from " + this.routerId);
+    }
+
+    Object ret = null;
+    if (rpcMonitor != null) {
+      rpcMonitor.proxyOp();
+    }
+    boolean failover = false;
+    Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>();
+    for (FederationNamenodeContext namenode : namenodes) {
+      ConnectionContext connection = null;
+      try {
+        String nsId = namenode.getNameserviceId();
+        String rpcAddress = namenode.getRpcAddress();
+        connection = this.getConnection(ugi, nsId, rpcAddress);
+        ProxyAndInfo<ClientProtocol> client = connection.getClient();
+        ClientProtocol proxy = client.getProxy();
+        ret = invoke(nsId, 0, method, proxy, params);
+        if (failover) {
+          // Success on alternate server, update
+          InetSocketAddress address = client.getAddress();
+          namenodeResolver.updateActiveNamenode(nsId, address);
+        }
+        if (this.rpcMonitor != null) {
+          this.rpcMonitor.proxyOpComplete(true);
+        }
+        return ret;
+      } catch (IOException ioe) {
+        ioes.put(namenode, ioe);
+        if (ioe instanceof StandbyException) {
+          // Fail over indicated by retry policy and/or NN
+          if (this.rpcMonitor != null) {
+            this.rpcMonitor.proxyOpFailureStandby();
+          }
+          failover = true;
+        } else if (ioe instanceof RemoteException) {
+          if (this.rpcMonitor != null) {
+            this.rpcMonitor.proxyOpComplete(true);
+          }
+          // RemoteException returned by NN
+          throw (RemoteException) ioe;
+        } else {
+          // Other communication error, this is a failure
+          // Communication retries are handled by the retry policy
+          if (this.rpcMonitor != null) {
+            this.rpcMonitor.proxyOpFailureCommunicate();
+            this.rpcMonitor.proxyOpComplete(false);
+          }
+          throw ioe;
+        }
+      } finally {
+        if (connection != null) {
+          connection.release();
+        }
+      }
+    }
+    if (this.rpcMonitor != null) {
+      this.rpcMonitor.proxyOpComplete(false);
+    }
+
+    // All namenodes were unavailable or in standby
+    String msg = "No namenode available to invoke " + method.getName() + " " +
+        Arrays.toString(params);
+    LOG.error(msg);
+    for (Entry<FederationNamenodeContext, IOException> entry :
+        ioes.entrySet()) {
+      FederationNamenodeContext namenode = entry.getKey();
+      String nsId = namenode.getNameserviceId();
+      String nnId = namenode.getNamenodeId();
+      String addr = namenode.getRpcAddress();
+      IOException ioe = entry.getValue();
+      if (ioe instanceof StandbyException) {
+        LOG.error("{} {} at {} is in Standby", nsId, nnId, addr);
+      } else {
+        LOG.error("{} {} at {} error: \"{}\"",
+            nsId, nnId, addr, ioe.getMessage());
+      }
+    }
+    throw new StandbyException(msg);
+  }
+
+  /**
+   * Invokes a method on the designated object. Catches exceptions specific to
+   * the invocation.
+   *
+   * Re-throws exceptions generated by the remote RPC call as either
+   * RemoteException or IOException.
+   *
+   * @param nsId Identifier for the namespace
+   * @param retryCount Current retry times
+   * @param method Method to invoke
+   * @param obj Target object for the method
+   * @param params Variable parameters
+   * @return Response from the remote server
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private Object invoke(String nsId, int retryCount, final Method method,
+      final Object obj, final Object... params) throws IOException {
+    try {
+      return method.invoke(obj, params);
+    } catch (IllegalAccessException e) {
+      LOG.error("Unexpected exception while proxying API", e);
+      return null;
+    } catch (IllegalArgumentException e) {
+      LOG.error("Unexpected exception while proxying API", e);
+      return null;
+    } catch (InvocationTargetException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof IOException) {
+        IOException ioe = (IOException) cause;
+
+        // Check if we should retry.
+        RetryDecision decision = shouldRetry(ioe, retryCount, nsId);
+        if (decision == RetryDecision.RETRY) {
+          if (this.rpcMonitor != null) {
+            this.rpcMonitor.proxyOpRetries();
+          }
+
+          // retry
+          return invoke(nsId, ++retryCount, method, obj, params);
+        } else if (decision == RetryDecision.FAILOVER_AND_RETRY) {
+          // failover, invoker looks for standby exceptions for failover.
+          if (ioe instanceof StandbyException) {
+            throw ioe;
+          } else {
+            throw new StandbyException(ioe.getMessage());
+          }
+        } else {
+          if (ioe instanceof RemoteException) {
+            RemoteException re = (RemoteException) ioe;
+            ioe = re.unwrapRemoteException();
+            ioe = getCleanException(ioe);
+          }
+          throw ioe;
+        }
+      } else {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  /**
+   * Check if the cluster of given nameservice id is available.
+   * @param nsId nameservice ID.
+   * @return
+   * @throws IOException
+   */
+  private boolean isClusterUnAvailable(String nsId) throws IOException {
+    List<? extends FederationNamenodeContext> nnState = this.namenodeResolver
+        .getNamenodesForNameserviceId(nsId);
+
+    if (nnState != null) {
+      for (FederationNamenodeContext nnContext : nnState) {
+        // Once we find one NN is in active state, we assume this
+        // cluster is available.
+        if (nnContext.getState() == FederationNamenodeServiceState.ACTIVE) {
+          return false;
+        }
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Get a clean copy of the exception. Sometimes the exceptions returned by the
+   * server contain the full stack trace in the message.
+   *
+   * @param ioe Exception to clean up.
+   * @return Copy of the original exception with a clean message.
+   */
+  private static IOException getCleanException(IOException ioe) {
+    IOException ret = null;
+
+    String msg = ioe.getMessage();
+    Throwable cause = ioe.getCause();
+    StackTraceElement[] stackTrace = ioe.getStackTrace();
+
+    // Clean the message by removing the stack trace
+    int index = msg.indexOf("\n");
+    if (index > 0) {
+      String[] msgSplit = msg.split("\n");
+      msg = msgSplit[0];
+
+      // Parse stack trace from the message
+      List<StackTraceElement> elements = new LinkedList<>();
+      for (int i=1; i<msgSplit.length; i++) {
+        String line = msgSplit[i];
+        Matcher matcher = STACK_TRACE_PATTERN.matcher(line);
+        if (matcher.find()) {
+          String declaringClass = matcher.group(1);
+          String methodName = matcher.group(2);
+          String fileName = matcher.group(3);
+          int lineNumber = Integer.parseInt(matcher.group(4));
+          StackTraceElement element = new StackTraceElement(
+              declaringClass, methodName, fileName, lineNumber);
+          elements.add(element);
+        }
+      }
+      stackTrace = elements.toArray(new StackTraceElement[elements.size()]);
+    }
+
+    // Create the new output exception
+    if (ioe instanceof RemoteException) {
+      RemoteException re = (RemoteException)ioe;
+      ret = new RemoteException(re.getClassName(), msg);
+    } else {
+      // Try the simple constructor and initialize the fields
+      Class<? extends IOException> ioeClass = ioe.getClass();
+      try {
+        Constructor<? extends IOException> constructor =
+            ioeClass.getDeclaredConstructor(String.class);
+        ret = constructor.newInstance(msg);
+      } catch (ReflectiveOperationException e) {
+        // If there are errors, just use the input one
+        LOG.error("Could not create exception {}", ioeClass.getSimpleName(), e);
+        ret = ioe;
+      }
+    }
+    if (ret != null) {
+      ret.initCause(cause);
+      ret.setStackTrace(stackTrace);
+    }
+
+    return ret;
+  }
+
+  /**
+   * Invokes a ClientProtocol method. Determines the target nameservice via a
+   * provided block.
+   *
+   * Re-throws exceptions generated by the remote RPC call as either
+   * RemoteException or IOException.
+   *
+   * @param block Block used to determine appropriate nameservice.
+   * @param method The remote method and parameters to invoke.
+   * @return The result of invoking the method.
+   * @throws IOException
+   */
+  public Object invokeSingle(final ExtendedBlock block, RemoteMethod method)
+      throws IOException {
+    String bpId = block.getBlockPoolId();
+    return invokeSingleBlockPool(bpId, method);
+  }
+
+  /**
+   * Invokes a ClientProtocol method. Determines the target nameservice using
+   * the block pool id.
+   *
+   * Re-throws exceptions generated by the remote RPC call as either
+   * RemoteException or IOException.
+   *
+   * @param bpId Block pool identifier.
+   * @param method The remote method and parameters to invoke.
+   * @return The result of invoking the method.
+   * @throws IOException
+   */
+  public Object invokeSingleBlockPool(final String bpId, RemoteMethod method)
+      throws IOException {
+    String nsId = getNameserviceForBlockPoolId(bpId);
+    return invokeSingle(nsId, method);
+  }
+
+  /**
+   * Invokes a ClientProtocol method against the specified namespace.
+   *
+   * Re-throws exceptions generated by the remote RPC call as either
+   * RemoteException or IOException.
+   *
+   * @param nsId Target namespace for the method.
+   * @param method The remote method and parameters to invoke.
+   * @return The result of invoking the method.
+   * @throws IOException
+   */
+  public Object invokeSingle(final String nsId, RemoteMethod method)
+      throws IOException {
+    UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+    List<? extends FederationNamenodeContext> nns =
+        getNamenodesForNameservice(nsId);
+    RemoteLocationContext loc = new RemoteLocation(nsId, "/");
+    return invokeMethod(ugi, nns, method.getMethod(), method.getParams(loc));
+  }
+
+  /**
+   * Invokes a single proxy call for a single location.
+   *
+   * Re-throws exceptions generated by the remote RPC call as either
+   * RemoteException or IOException.
+   *
+   * @param location RemoteLocation to invoke.
+   * @param remoteMethod The remote method and parameters to invoke.
+   * @return The result of invoking the method if successful.
+   * @throws IOException
+   */
+  public Object invokeSingle(final RemoteLocationContext location,
+      RemoteMethod remoteMethod) throws IOException {
+    List<RemoteLocationContext> locations = Collections.singletonList(location);
+    return invokeSequential(locations, remoteMethod);
+  }
+
+  /**
+   * Invokes sequential proxy calls to different locations. Continues to invoke
+   * calls until a call returns without throwing a remote exception.
+   *
+   * @param locations List of locations/nameservices to call concurrently.
+   * @param remoteMethod The remote method and parameters to invoke.
+   * @return The result of the first successful call, or if no calls are
+   *         successful, the result of the last RPC call executed.
+   * @throws IOException if the success condition is not met and one of the RPC
+   *           calls generated a remote exception.
+   */
+  public Object invokeSequential(
+      final List<? extends RemoteLocationContext> locations,
+      final RemoteMethod remoteMethod) throws IOException {
+    return invokeSequential(locations, remoteMethod, null, null);
+  }
+
+  /**
+   * Invokes sequential proxy calls to different locations. Continues to invoke
+   * calls until the success condition is met, or until all locations have been
+   * attempted.
+   *
+   * The success condition may be specified by:
+   * <ul>
+   * <li>An expected result class
+   * <li>An expected result value
+   * </ul>
+   *
+   * If no expected result class/values are specified, the success condition is
+   * a call that does not throw a remote exception.
+   *
+   * @param locations List of locations/nameservices to call concurrently.
+   * @param remoteMethod The remote method and parameters to invoke.
+   * @param expectedResultClass In order to be considered a positive result, the
+   *          return type must be of this class.
+   * @param expectedResultValue In order to be considered a positive result, the
+   *          return value must equal the value of this object.
+   * @return The result of the first successful call, or if no calls are
+   *         successful, the result of the first RPC call executed.
+   * @throws IOException if the success condition is not met, return the first
+   *                     remote exception generated.
+   */
+  public <T> T invokeSequential(
+      final List<? extends RemoteLocationContext> locations,
+      final RemoteMethod remoteMethod, Class<T> expectedResultClass,
+      Object expectedResultValue) throws IOException {
+
+    final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+    final Method m = remoteMethod.getMethod();
+    IOException firstThrownException = null;
+    IOException lastThrownException = null;
+    Object firstResult = null;
+    // Invoke in priority order
+    for (final RemoteLocationContext loc : locations) {
+      String ns = loc.getNameserviceId();
+      List<? extends FederationNamenodeContext> namenodes =
+          getNamenodesForNameservice(ns);
+      try {
+        Object[] params = remoteMethod.getParams(loc);
+        Object result = invokeMethod(ugi, namenodes, m, params);
+        // Check if the result is what we expected
+        if (isExpectedClass(expectedResultClass, result) &&
+            isExpectedValue(expectedResultValue, result)) {
+          // Valid result, stop here
+          @SuppressWarnings("unchecked")
+          T ret = (T)result;
+          return ret;
+        }
+        if (firstResult == null) {
+          firstResult = result;
+        }
+      } catch (IOException ioe) {
+        // Record it and move on
+        lastThrownException = (IOException) ioe;
+        if (firstThrownException == null) {
+          firstThrownException = lastThrownException;
+        }
+      } catch (Exception e) {
+        // Unusual error, ClientProtocol calls always use IOException (or
+        // RemoteException). Re-wrap in IOException for compatibility with
+        // ClientProtcol.
+        LOG.error("Unexpected exception {} proxying {} to {}",
+            e.getClass(), m.getName(), ns, e);
+        lastThrownException = new IOException(
+            "Unexpected exception proxying API " + e.getMessage(), e);
+        if (firstThrownException == null) {
+          firstThrownException = lastThrownException;
+        }
+      }
+    }
+
+    if (firstThrownException != null) {
+      // re-throw the last exception thrown for compatibility
+      throw firstThrownException;
+    }
+    // Return the last result, whether it is the value we are looking for or a
+    @SuppressWarnings("unchecked")
+    T ret = (T)firstResult;
+    return ret;
+  }
+
+  /**
+   * Checks if a result matches the required result class.
+   *
+   * @param expectedClass Required result class, null to skip the check.
+   * @param clazz The result to check.
+   * @return True if the result is an instance of the required class or if the
+   *         expected class is null.
+   */
+  private static boolean isExpectedClass(Class<?> expectedClass, Object clazz) {
+    if (expectedClass == null) {
+      return true;
+    } else if (clazz == null) {
+      return false;
+    } else {
+      return expectedClass.isInstance(clazz);
+    }
+  }
+
+  /**
+   * Checks if a result matches the expected value.
+   *
+   * @param expectedValue The expected value, null to skip the check.
+   * @param value The result to check.
+   * @return True if the result is equals to the expected value or if the
+   *         expected value is null.
+   */
+  private static boolean isExpectedValue(Object expectedValue, Object value) {
+    if (expectedValue == null) {
+      return true;
+    } else if (value == null) {
+      return false;
+    } else {
+      return value.equals(expectedValue);
+    }
+  }
+
+  /**
+   * Invoke multiple concurrent proxy calls to different clients. Returns an
+   * array of results.
+   *
+   * Re-throws exceptions generated by the remote RPC call as either
+   * RemoteException or IOException.
+   *
+   * @param <T> The type of the remote location.
+   * @param locations List of remote locations to call concurrently.
+   * @param method The remote method and parameters to invoke.
+   * @param requireResponse If true an exception will be thrown if all calls do
+   *          not complete. If false exceptions are ignored and all data results
+   *          successfully received are returned.
+   * @param standby If the requests should go to the standby namenodes too.
+   * @throws IOException If all the calls throw an exception.
+   */
+  public <T extends RemoteLocationContext, R> void invokeConcurrent(
+      final Collection<T> locations, final RemoteMethod method,
+      boolean requireResponse, boolean standby) throws IOException {
+    invokeConcurrent(locations, method, requireResponse, standby, void.class);
+  }
+
+  /**
+   * Invokes multiple concurrent proxy calls to different clients. Returns an
+   * array of results.
+   *
+   * Re-throws exceptions generated by the remote RPC call as either
+   * RemoteException or IOException.
+   *
+   * @param <T> The type of the remote location.
+   * @param <R> The type of the remote method return.
+   * @param locations List of remote locations to call concurrently.
+   * @param method The remote method and parameters to invoke.
+   * @param requireResponse If true an exception will be thrown if all calls do
+   *          not complete. If false exceptions are ignored and all data results
+   *          successfully received are returned.
+   * @param standby If the requests should go to the standby namenodes too.
+   * @param clazz Type of the remote return type.
+   * @return Result of invoking the method per subcluster: nsId -> result.
+   * @throws IOException If requiredResponse=true and any of the calls throw an
+   *           exception.
+   */
+  public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
+      final Collection<T> locations, final RemoteMethod method,
+      boolean requireResponse, boolean standby, Class<R> clazz)
+          throws IOException {
+    return invokeConcurrent(
+        locations, method, requireResponse, standby, -1, clazz);
+  }
+
+  /**
+   * Invokes multiple concurrent proxy calls to different clients. Returns an
+   * array of results.
+   *
+   * Re-throws exceptions generated by the remote RPC call as either
+   * RemoteException or IOException.
+   *
+   * @param <T> The type of the remote location.
+   * @param <R> The type of the remote method return.
+   * @param locations List of remote locations to call concurrently.
+   * @param method The remote method and parameters to invoke.
+   * @param requireResponse If true an exception will be thrown if all calls do
+   *          not complete. If false exceptions are ignored and all data results
+   *          successfully received are returned.
+   * @param standby If the requests should go to the standby namenodes too.
+   * @param timeOutMs Timeout for each individual call.
+   * @param clazz Type of the remote return type.
+   * @return Result of invoking the method per subcluster: nsId -> result.
+   * @throws IOException If requiredResponse=true and any of the calls throw an
+   *           exception.
+   */
+  @SuppressWarnings("unchecked")
+  public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
+      final Collection<T> locations, final RemoteMethod method,
+      boolean requireResponse, boolean standby, long timeOutMs, Class<R> clazz)
+          throws IOException {
+
+    final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+    final Method m = method.getMethod();
+
+    if (locations.size() == 1) {
+      // Shortcut, just one call
+      T location = locations.iterator().next();
+      String ns = location.getNameserviceId();
+      final List<? extends FederationNamenodeContext> namenodes =
+          getNamenodesForNameservice(ns);
+      Object[] paramList = method.getParams(location);
+      Object result = invokeMethod(ugi, namenodes, m, paramList);
+      return Collections.singletonMap(location, clazz.cast(result));
+    }
+
+    List<T> orderedLocations = new LinkedList<>();
+    Set<Callable<Object>> callables = new HashSet<>();
+    for (final T location : locations) {
+      String nsId = location.getNameserviceId();
+      final List<? extends FederationNamenodeContext> namenodes =
+          getNamenodesForNameservice(nsId);
+      final Object[] paramList = method.getParams(location);
+      if (standby) {
+        // Call the objectGetter to all NNs (including standby)
+        for (final FederationNamenodeContext nn : namenodes) {
+          String nnId = nn.getNamenodeId();
+          final List<FederationNamenodeContext> nnList =
+              Collections.singletonList(nn);
+          T nnLocation = location;
+          if (location instanceof RemoteLocation) {
+            nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest());
+          }
+          orderedLocations.add(nnLocation);
+          callables.add(new Callable<Object>() {
+            public Object call() throws Exception {
+              return invokeMethod(ugi, nnList, m, paramList);
+            }
+          });
+        }
+      } else {
+        // Call the objectGetter in order of nameservices in the NS list
+        orderedLocations.add(location);
+        callables.add(new Callable<Object>() {
+          public Object call() throws Exception {
+            return invokeMethod(ugi, namenodes, m, paramList);
+          }
+        });
+      }
+    }
+
+    if (rpcMonitor != null) {
+      rpcMonitor.proxyOp();
+    }
+
+    try {
+      List<Future<Object>> futures = null;
+      if (timeOutMs > 0) {
+        futures = executorService.invokeAll(
+            callables, timeOutMs, TimeUnit.MILLISECONDS);
+      } else {
+        futures = executorService.invokeAll(callables);
+      }
+      Map<T, R> results = new TreeMap<>();
+      Map<T, IOException> exceptions = new TreeMap<>();
+      for (int i=0; i<futures.size(); i++) {
+        T location = orderedLocations.get(i);
+        try {
+          Future<Object> future = futures.get(i);
+          Object result = future.get();
+          results.put(location, clazz.cast(result));
+        } catch (CancellationException ce) {
+          T loc = orderedLocations.get(i);
+          String msg =
+              "Invocation to \"" + loc + "\" for \"" + method + "\" timed out";
+          LOG.error(msg);
+          IOException ioe = new IOException(msg);
+          exceptions.put(location, ioe);
+        } catch (ExecutionException ex) {
+          Throwable cause = ex.getCause();
+          LOG.debug("Canot execute {} in {}: {}",
+              m.getName(), location, cause.getMessage());
+
+          // Convert into IOException if needed
+          IOException ioe = null;
+          if (cause instanceof IOException) {
+            ioe = (IOException) cause;
+          } else {
+            ioe = new IOException("Unhandled exception while proxying API " +
+                m.getName() + ": " + cause.getMessage(), cause);
+          }
+
+          // Response from all servers required, use this error.
+          if (requireResponse) {
+            throw ioe;
+          }
+
+          // Store the exceptions
+          exceptions.put(location, ioe);
+        }
+      }
+
+      // Throw the exception for the first location if there are no results
+      if (results.isEmpty()) {
+        T location = orderedLocations.get(0);
+        IOException ioe = exceptions.get(location);
+        if (ioe != null) {
+          throw ioe;
+        }
+      }
+
+      return results;
+    } catch (InterruptedException ex) {
+      LOG.error("Unexpected error while invoking API: {}", ex.getMessage());
+      throw new IOException(
+          "Unexpected error while invoking API " + ex.getMessage(), ex);
+    }
+  }
+
+  /**
+   * Get a prioritized list of NNs that share the same nameservice ID (in the
+   * same namespace). NNs that are reported as ACTIVE will be first in the list.
+   *
+   * @param nsId The nameservice ID for the namespace.
+   * @return A prioritized list of NNs to use for communication.
+   * @throws IOException If a NN cannot be located for the nameservice ID.
+   */
+  private List<? extends FederationNamenodeContext> getNamenodesForNameservice(
+      final String nsId) throws IOException {
+
+    final List<? extends FederationNamenodeContext> namenodes =
+        namenodeResolver.getNamenodesForNameserviceId(nsId);
+
+    if (namenodes == null || namenodes.isEmpty()) {
+      throw new IOException("Cannot locate a registered namenode for " + nsId +
+          " from " + this.routerId);
+    }
+    return namenodes;
+  }
+
+  /**
+   * Get a prioritized list of NNs that share the same block pool ID (in the
+   * same namespace). NNs that are reported as ACTIVE will be first in the list.
+   *
+   * @param bpId The blockpool ID for the namespace.
+   * @return A prioritized list of NNs to use for communication.
+   * @throws IOException If a NN cannot be located for the block pool ID.
+   */
+  private List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId(
+      final String bpId) throws IOException {
+
+    List<? extends FederationNamenodeContext> namenodes =
+        namenodeResolver.getNamenodesForBlockPoolId(bpId);
+
+    if (namenodes == null || namenodes.isEmpty()) {
+      throw new IOException("Cannot locate a registered namenode for " + bpId +
+          " from " + this.routerId);
+    }
+    return namenodes;
+  }
+
+  /**
+   * Get the nameservice identifier for a block pool.
+   *
+   * @param bpId Identifier of the block pool.
+   * @return Nameservice identifier.
+   * @throws IOException If a NN cannot be located for the block pool ID.
+   */
+  private String getNameserviceForBlockPoolId(final String bpId)
+      throws IOException {
+    List<? extends FederationNamenodeContext> namenodes =
+        getNamenodesForBlockPoolId(bpId);
+    FederationNamenodeContext namenode = namenodes.get(0);
+    return namenode.getNameserviceId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
new file mode 100644
index 0000000..df9aa11
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+
+/**
+ * Metrics and monitoring interface for the router RPC server. Allows pluggable
+ * diagnostics and monitoring services to be attached.
+ */
+public interface RouterRpcMonitor {
+
+  /**
+   * Initialize the monitor.
+   * @param conf Configuration for the monitor.
+   * @param server RPC server.
+   * @param store State Store.
+   */
+  void init(
+      Configuration conf, RouterRpcServer server, StateStoreService store);
+
+  /**
+   * Get Router RPC metrics info.
+   * @return The instance of FederationRPCMetrics.
+   */
+  FederationRPCMetrics getRPCMetrics();
+
+  /**
+   * Close the monitor.
+   */
+  void close();
+
+  /**
+   * Start processing an operation on the Router.
+   */
+  void startOp();
+
+  /**
+   * Start proxying an operation to the Namenode.
+   * @return Id of the thread doing the proxying.
+   */
+  long proxyOp();
+
+  /**
+   * Mark a proxy operation as completed.
+   * @param success If the operation was successful.
+   */
+  void proxyOpComplete(boolean success);
+
+  /**
+   * Failed to proxy an operation to a Namenode because it was in standby.
+   */
+  void proxyOpFailureStandby();
+
+  /**
+   * Failed to proxy an operation to a Namenode because of an unexpected
+   * exception.
+   */
+  void proxyOpFailureCommunicate();
+
+  /**
+   * Failed to proxy an operation because it is not implemented.
+   */
+  void proxyOpNotImplemented();
+
+  /**
+   * Retry to proxy an operation to a Namenode because of an unexpected
+   * exception.
+   */
+  void proxyOpRetries();
+
+  /**
+   * If the Router cannot contact the State Store in an operation.
+   */
+  void routerFailureStateStore();
+
+  /**
+   * If the Router is in safe mode.
+   */
+  void routerFailureSafemode();
+
+  /**
+   * If a path is locked.
+   */
+  void routerFailureLocked();
+
+  /**
+   * If a path is in a read only mount point.
+   */
+  void routerFailureReadOnly();
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message