hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [16/37] hadoop git commit: HDFS-13215. RBF: Move Router to its own module.
Date Tue, 20 Mar 2018 06:18:05 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java
deleted file mode 100644
index 3508eab..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.resolver.order;
-
-import static org.apache.hadoop.util.Time.monotonicNow;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.net.HostAndPort;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.security.PrivilegedAction;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
-import org.apache.hadoop.hdfs.server.federation.router.Router;
-import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
-import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
-import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
-import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
-import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
-import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
-import org.apache.hadoop.ipc.RPC.Server;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The local subcluster (where the writer is) should be tried first. The writer
- * is defined from the RPC query received in the RPC server.
- */
-public class LocalResolver implements OrderedResolver {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(LocalResolver.class);
-
-  /** Configuration key to set the minimum time to update the local cache.*/
-  public static final String MIN_UPDATE_PERIOD_KEY =
-      DFSConfigKeys.FEDERATION_ROUTER_PREFIX + "local-resolver.update-period";
-  /** 10 seconds by default. */
-  private static final long MIN_UPDATE_PERIOD_DEFAULT =
-      TimeUnit.SECONDS.toMillis(10);
-
-
-  /** Router service. */
-  private final Router router;
-  /** Minimum update time. */
-  private final long minUpdateTime;
-
-  /** Node IP -> Subcluster. */
-  private Map<String, String> nodeSubcluster = null;
-  /** Last time the subcluster map was updated. */
-  private long lastUpdated;
-
-
-  public LocalResolver(final Configuration conf, final Router routerService) {
-    this.minUpdateTime = conf.getTimeDuration(
-        MIN_UPDATE_PERIOD_KEY, MIN_UPDATE_PERIOD_DEFAULT,
-        TimeUnit.MILLISECONDS);
-    this.router = routerService;
-  }
-
-  /**
-   * Get the local name space. This relies on the RPC Server to get the address
-   * from the client.
-   *
-   * TODO we only support DN and NN locations, we need to add others like
-   * Resource Managers.
-   *
-   * @param path Path ignored by this policy.
-   * @param loc Federated location with multiple destinations.
-   * @return Local name space. Null if we don't know about this machine.
-   */
-  @Override
-  public String getFirstNamespace(final String path, final PathLocation loc) {
-    String localSubcluster = null;
-    String clientAddr = getClientAddr();
-    Map<String, String> nodeToSubcluster = getSubclusterMappings();
-    if (nodeToSubcluster != null) {
-      localSubcluster = nodeToSubcluster.get(clientAddr);
-      if (localSubcluster != null) {
-        LOG.debug("Local namespace for {} is {}", clientAddr, localSubcluster);
-      } else {
-        LOG.error("Cannot get local namespace for {}", clientAddr);
-      }
-    } else {
-      LOG.error("Cannot get node mapping when resolving {} at {} from {}",
-          path, loc, clientAddr);
-    }
-    return localSubcluster;
-  }
-
-  @VisibleForTesting
-  String getClientAddr() {
-    return Server.getRemoteAddress();
-  }
-
-  /**
-   * Get the mapping from nodes to subcluster. It gets this mapping from the
-   * subclusters through expensive calls (e.g., RPC) and uses caching to avoid
-   * too many calls. The cache might be updated asynchronously to reduce
-   * latency.
-   *
-   * @return Node IP -> Subcluster.
-   */
-  @VisibleForTesting
-  synchronized Map<String, String> getSubclusterMappings() {
-    if (nodeSubcluster == null ||
-        (monotonicNow() - lastUpdated) > minUpdateTime) {
-      // Fetch the mapping asynchronously
-      Thread updater = new Thread(new Runnable() {
-        @Override
-        public void run() {
-          Map<String, String> mapping = new HashMap<>();
-
-          Map<String, String> dnSubcluster = getDatanodesSubcluster();
-          if (dnSubcluster != null) {
-            mapping.putAll(dnSubcluster);
-          }
-
-          Map<String, String> nnSubcluster = getNamenodesSubcluster();
-          if (nnSubcluster != null) {
-            mapping.putAll(nnSubcluster);
-          }
-          nodeSubcluster = mapping;
-          lastUpdated = monotonicNow();
-        }
-      });
-      updater.start();
-
-      // Wait until initialized
-      if (nodeSubcluster == null) {
-        try {
-          LOG.debug("Wait to get the mapping for the first time");
-          updater.join();
-        } catch (InterruptedException e) {
-          LOG.error("Cannot wait for the updater to finish");
-        }
-      }
-    }
-    return nodeSubcluster;
-  }
-
-  /**
-   * Get the Datanode mapping from the subclusters from the Namenodes. This
-   * needs to be done as a privileged action to use the user for the Router and
-   * not the one from the client in the RPC call.
-   *
-   * @return DN IP -> Subcluster.
-   */
-  private Map<String, String> getDatanodesSubcluster() {
-
-    final RouterRpcServer rpcServer = getRpcServer();
-    if (rpcServer == null) {
-      LOG.error("Cannot access the Router RPC server");
-      return null;
-    }
-
-    Map<String, String> ret = new HashMap<>();
-    try {
-      // We need to get the DNs as a privileged user
-      UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
-      Map<String, DatanodeStorageReport[]> dnMap = loginUser.doAs(
-          new PrivilegedAction<Map<String, DatanodeStorageReport[]>>() {
-            @Override
-            public Map<String, DatanodeStorageReport[]> run() {
-              try {
-                return rpcServer.getDatanodeStorageReportMap(
-                    DatanodeReportType.ALL);
-              } catch (IOException e) {
-                LOG.error("Cannot get the datanodes from the RPC server", e);
-                return null;
-              }
-            }
-          });
-      for (Entry<String, DatanodeStorageReport[]> entry : dnMap.entrySet()) {
-        String nsId = entry.getKey();
-        DatanodeStorageReport[] dns = entry.getValue();
-        for (DatanodeStorageReport dn : dns) {
-          DatanodeInfo dnInfo = dn.getDatanodeInfo();
-          String ipAddr = dnInfo.getIpAddr();
-          ret.put(ipAddr, nsId);
-        }
-      }
-    } catch (IOException e) {
-      LOG.error("Cannot get Datanodes from the Namenodes: {}", e.getMessage());
-    }
-    return ret;
-  }
-
-  /**
-   * Get the Namenode mapping from the subclusters from the Membership store. As
-   * the Routers are usually co-located with Namenodes, we also check for the
-   * local address for this Router here.
-   *
-   * @return NN IP -> Subcluster.
-   */
-  private Map<String, String> getNamenodesSubcluster() {
-
-    final MembershipStore membershipStore = getMembershipStore();
-    if (membershipStore == null) {
-      LOG.error("Cannot access the Membership store");
-      return null;
-    }
-
-    // Manage requests from this hostname (127.0.0.1)
-    String localIp = "127.0.0.1";
-    String localHostname = localIp;
-    try {
-      localHostname = InetAddress.getLocalHost().getHostName();
-    } catch (UnknownHostException e) {
-      LOG.error("Cannot get local host name");
-    }
-
-    Map<String, String> ret = new HashMap<>();
-    try {
-      // Get the values from the store
-      GetNamenodeRegistrationsRequest request =
-          GetNamenodeRegistrationsRequest.newInstance();
-      GetNamenodeRegistrationsResponse response =
-          membershipStore.getNamenodeRegistrations(request);
-      final List<MembershipState> nns = response.getNamenodeMemberships();
-      for (MembershipState nn : nns) {
-        try {
-          String nsId = nn.getNameserviceId();
-          String rpcAddress = nn.getRpcAddress();
-          String hostname = HostAndPort.fromString(rpcAddress).getHostText();
-          ret.put(hostname, nsId);
-          if (hostname.equals(localHostname)) {
-            ret.put(localIp, nsId);
-          }
-
-          InetAddress addr = InetAddress.getByName(hostname);
-          String ipAddr = addr.getHostAddress();
-          ret.put(ipAddr, nsId);
-        } catch (Exception e) {
-          LOG.error("Cannot get address for {}: {}", nn, e.getMessage());
-        }
-      }
-    } catch (IOException ioe) {
-      LOG.error("Cannot get Namenodes from the State Store: {}",
-          ioe.getMessage());
-    }
-    return ret;
-  }
-
-  /**
-   * Get the Router RPC server.
-   *
-   * @return Router RPC server. Null if not possible.
-   */
-  private RouterRpcServer getRpcServer() {
-    if (this.router == null) {
-      return null;
-    }
-    return router.getRpcServer();
-  }
-
-  /**
-   * Get the Membership store.
-   *
-   * @return Membership store.
-   */
-  private MembershipStore getMembershipStore() {
-    StateStoreService stateStore = router.getStateStore();
-    if (stateStore == null) {
-      return null;
-    }
-    return stateStore.getRegisteredRecordStore(MembershipStore.class);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/OrderedResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/OrderedResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/OrderedResolver.java
deleted file mode 100644
index 3a3ccf7..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/OrderedResolver.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.resolver.order;
-
-import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
-
-
-/**
- * Policy that decides which should be the first location accessed given
- * multiple destinations.
- */
-public interface OrderedResolver {
-
-  /**
-   * Get the first namespace based on this resolver approach.
-   *
-   * @param path Path to check.
-   * @param loc Federated location with multiple destinations.
-   * @return First namespace out of the locations.
-   */
-  String getFirstNamespace(String path, PathLocation loc);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RandomResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RandomResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RandomResolver.java
deleted file mode 100644
index 022aa48..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RandomResolver.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.resolver.order;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Order the destinations randomly.
- */
-public class RandomResolver implements OrderedResolver {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(RandomResolver.class);
-
-
-  /** Random number generator. */
-  private static final Random RANDOM = new Random();
-
-  /**
-   * Get a random name space from the path.
-   *
-   * @param path Path ignored by this policy.
-   * @param loc Federated location with multiple destinations.
-   * @return Random name space.
-   */
-  public String getFirstNamespace(final String path, final PathLocation loc) {
-    if (loc == null) {
-      return null;
-    }
-    Set<String> namespaces = loc.getNamespaces();
-    if (namespaces == null || namespaces.isEmpty()) {
-      LOG.error("Cannot get namespaces for {}", loc);
-      return null;
-    }
-    List<String> nssList = new ArrayList<>(namespaces);
-    int index = RANDOM.nextInt(nssList.size());
-    return nssList.get(index);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/package-info.java
deleted file mode 100644
index f90152f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/package-info.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * A federated location can be resolved to multiple subclusters. This package
- * takes care of the order in which this multiple destinations should be used.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-
-package org.apache.hadoop.hdfs.server.federation.resolver.order;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/package-info.java
deleted file mode 100644
index d8be9e3..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/package-info.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * The resolver package contains indepedent data resolvers used in HDFS
- * federation. The data resolvers collect data from the cluster, including from
- * the state store. The resolvers expose APIs used by HDFS federation to collect
- * aggregated, cached data for use in Real-time request processing. The
- * resolvers are perf-sensitive and are used in the flow of the
- * {@link RouterRpcServer} request path.
- * <p>
- * The principal resolvers are:
- * <ul>
- * <li>{@link ActiveNamenodeResolver} Real-time interface for locating the most
- * recently active NN for a nameservice.
- * <li>{@link FileSubclusterResolver} Real-time interface for determining the NN
- * and local file path for a given file/folder based on the global namespace
- * path.
- * </ul>
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-package org.apache.hadoop.hdfs.server.federation.resolver;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
deleted file mode 100644
index 1d27b51..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.router;
-
-import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.ipc.RPC;
-
-/**
- * Context to track a connection in a {@link ConnectionPool}. When a client uses
- * a connection, it increments a counter to mark it as active. Once the client
- * is done with the connection, it decreases the counter. It also takes care of
- * closing the connection once is not active.
- */
-public class ConnectionContext {
-
-  /** Client for the connection. */
-  private final ProxyAndInfo<ClientProtocol> client;
-  /** How many threads are using this connection. */
-  private int numThreads = 0;
-  /** If the connection is closed. */
-  private boolean closed = false;
-
-
-  public ConnectionContext(ProxyAndInfo<ClientProtocol> connection) {
-    this.client = connection;
-  }
-
-  /**
-   * Check if the connection is active.
-   *
-   * @return True if the connection is active.
-   */
-  public synchronized boolean isActive() {
-    return this.numThreads > 0;
-  }
-
-  /**
-   * Check if the connection is closed.
-   *
-   * @return If the connection is closed.
-   */
-  public synchronized boolean isClosed() {
-    return this.closed;
-  }
-
-  /**
-   * Check if the connection can be used. It checks if the connection is used by
-   * another thread or already closed.
-   *
-   * @return True if the connection can be used.
-   */
-  public synchronized boolean isUsable() {
-    return !isActive() && !isClosed();
-  }
-
-  /**
-   * Get the connection client.
-   *
-   * @return Connection client.
-   */
-  public synchronized ProxyAndInfo<ClientProtocol> getClient() {
-    this.numThreads++;
-    return this.client;
-  }
-
-  /**
-   * Release this connection. If the connection was closed, close the proxy.
-   * Otherwise, mark the connection as not used by us anymore.
-   */
-  public synchronized void release() {
-    if (--this.numThreads == 0 && this.closed) {
-      close();
-    }
-  }
-
-  /**
-   * We will not use this connection anymore. If it's not being used, we close
-   * it. Otherwise, we let release() do it once we are done with it.
-   */
-  public synchronized void close() {
-    this.closed = true;
-    if (this.numThreads == 0) {
-      ClientProtocol proxy = this.client.getProxy();
-      // Nobody should be using this anymore so it should close right away
-      RPC.stopProxy(proxy);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
deleted file mode 100644
index 594f489..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
+++ /dev/null
@@ -1,436 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.router;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Time;
-import org.eclipse.jetty.util.ajax.JSON;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements a pool of connections for the {@link Router} to be able to open
- * many connections to many Namenodes.
- */
-public class ConnectionManager {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ConnectionManager.class);
-
-  /** Number of parallel new connections to create. */
-  protected static final int MAX_NEW_CONNECTIONS = 100;
-
-  /** Minimum amount of active connections: 50%. */
-  protected static final float MIN_ACTIVE_RATIO = 0.5f;
-
-
-  /** Configuration for the connection manager, pool and sockets. */
-  private final Configuration conf;
-
-  /** Min number of connections per user + nn. */
-  private final int minSize = 1;
-  /** Max number of connections per user + nn. */
-  private final int maxSize;
-
-  /** How often we close a pool for a particular user + nn. */
-  private final long poolCleanupPeriodMs;
-  /** How often we close a connection in a pool. */
-  private final long connectionCleanupPeriodMs;
-
-  /** Map of connection pools, one pool per user + NN. */
-  private final Map<ConnectionPoolId, ConnectionPool> pools;
-  /** Lock for accessing pools. */
-  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
-  private final Lock readLock = readWriteLock.readLock();
-  private final Lock writeLock = readWriteLock.writeLock();
-
-  /** Queue for creating new connections. */
-  private final BlockingQueue<ConnectionPool> creatorQueue =
-      new ArrayBlockingQueue<>(MAX_NEW_CONNECTIONS);
-  /** Create new connections asynchronously. */
-  private final ConnectionCreator creator;
-  /** Periodic executor to remove stale connection pools. */
-  private final ScheduledThreadPoolExecutor cleaner =
-      new ScheduledThreadPoolExecutor(1);
-
-  /** If the connection manager is running. */
-  private boolean running = false;
-
-
-  /**
-   * Creates a proxy client connection pool manager.
-   *
-   * @param config Configuration for the connections.
-   */
-  public ConnectionManager(Configuration config) {
-    this.conf = config;
-
-    // Configure minimum and maximum connection pools
-    this.maxSize = this.conf.getInt(
-        DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE,
-        DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT);
-
-    // Map with the connections indexed by UGI and Namenode
-    this.pools = new HashMap<>();
-
-    // Create connections in a thread asynchronously
-    this.creator = new ConnectionCreator(creatorQueue);
-    this.creator.setDaemon(true);
-
-    // Cleanup periods
-    this.poolCleanupPeriodMs = this.conf.getLong(
-        DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN,
-        DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT);
-    LOG.info("Cleaning connection pools every {} seconds",
-        TimeUnit.MILLISECONDS.toSeconds(this.poolCleanupPeriodMs));
-    this.connectionCleanupPeriodMs = this.conf.getLong(
-        DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS,
-        DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT);
-    LOG.info("Cleaning connections every {} seconds",
-        TimeUnit.MILLISECONDS.toSeconds(this.connectionCleanupPeriodMs));
-  }
-
-  /**
-   * Start the connection manager.
-   */
-  public void start() {
-    // Start the thread that creates connections asynchronously
-    this.creator.start();
-
-    // Schedule a task to remove stale connection pools and sockets
-    long recyleTimeMs = Math.min(
-        poolCleanupPeriodMs, connectionCleanupPeriodMs);
-    LOG.info("Cleaning every {} seconds",
-        TimeUnit.MILLISECONDS.toSeconds(recyleTimeMs));
-    this.cleaner.scheduleAtFixedRate(
-        new CleanupTask(), 0, recyleTimeMs, TimeUnit.MILLISECONDS);
-
-    // Mark the manager as running
-    this.running = true;
-  }
-
-  /**
-   * Stop the connection manager by closing all the pools.
-   */
-  public void close() {
-    this.creator.shutdown();
-    this.cleaner.shutdown();
-    this.running = false;
-
-    writeLock.lock();
-    try {
-      for (ConnectionPool pool : this.pools.values()) {
-        pool.close();
-      }
-      this.pools.clear();
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  /**
-   * Fetches the next available proxy client in the pool. Each client connection
-   * is reserved for a single user and cannot be reused until free.
-   *
-   * @param ugi User group information.
-   * @param nnAddress Namenode address for the connection.
-   * @return Proxy client to connect to nnId as UGI.
-   * @throws IOException If the connection cannot be obtained.
-   */
-  public ConnectionContext getConnection(
-      UserGroupInformation ugi, String nnAddress) throws IOException {
-
-    // Check if the manager is shutdown
-    if (!this.running) {
-      LOG.error(
-          "Cannot get a connection to {} because the manager isn't running",
-          nnAddress);
-      return null;
-    }
-
-    // Try to get the pool if created
-    ConnectionPoolId connectionId = new ConnectionPoolId(ugi, nnAddress);
-    ConnectionPool pool = null;
-    readLock.lock();
-    try {
-      pool = this.pools.get(connectionId);
-    } finally {
-      readLock.unlock();
-    }
-
-    // Create the pool if not created before
-    if (pool == null) {
-      writeLock.lock();
-      try {
-        pool = this.pools.get(connectionId);
-        if (pool == null) {
-          pool = new ConnectionPool(
-              this.conf, nnAddress, ugi, this.minSize, this.maxSize);
-          this.pools.put(connectionId, pool);
-        }
-      } finally {
-        writeLock.unlock();
-      }
-    }
-
-    ConnectionContext conn = pool.getConnection();
-
-    // Add a new connection to the pool if it wasn't usable
-    if (conn == null || !conn.isUsable()) {
-      if (!this.creatorQueue.offer(pool)) {
-        LOG.error("Cannot add more than {} connections at the same time",
-            MAX_NEW_CONNECTIONS);
-      }
-    }
-
-    if (conn != null && conn.isClosed()) {
-      LOG.error("We got a closed connection from {}", pool);
-      conn = null;
-    }
-
-    return conn;
-  }
-
-  /**
-   * Get the number of connection pools.
-   *
-   * @return Number of connection pools.
-   */
-  public int getNumConnectionPools() {
-    readLock.lock();
-    try {
-      return pools.size();
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  /**
-   * Get number of open connections.
-   *
-   * @return Number of open connections.
-   */
-  public int getNumConnections() {
-    int total = 0;
-    readLock.lock();
-    try {
-      for (ConnectionPool pool : this.pools.values()) {
-        total += pool.getNumConnections();
-      }
-    } finally {
-      readLock.unlock();
-    }
-    return total;
-  }
-
-  /**
-   * Get number of active connections.
-   *
-   * @return Number of active connections.
-   */
-  public int getNumActiveConnections() {
-    int total = 0;
-    readLock.lock();
-    try {
-      for (ConnectionPool pool : this.pools.values()) {
-        total += pool.getNumActiveConnections();
-      }
-    } finally {
-      readLock.unlock();
-    }
-    return total;
-  }
-
-  /**
-   * Get the number of connections to be created.
-   *
-   * @return Number of connections to be created.
-   */
-  public int getNumCreatingConnections() {
-    return this.creatorQueue.size();
-  }
-
-  /**
-   * Get a JSON representation of the connection pool.
-   *
-   * @return JSON representation of all the connection pools.
-   */
-  public String getJSON() {
-    final Map<String, String> info = new TreeMap<>();
-    readLock.lock();
-    try {
-      for (Entry<ConnectionPoolId, ConnectionPool> entry :
-          this.pools.entrySet()) {
-        ConnectionPoolId connectionPoolId = entry.getKey();
-        ConnectionPool pool = entry.getValue();
-        info.put(connectionPoolId.toString(), pool.getJSON());
-      }
-    } finally {
-      readLock.unlock();
-    }
-    return JSON.toString(info);
-  }
-
-  @VisibleForTesting
-  Map<ConnectionPoolId, ConnectionPool> getPools() {
-    return this.pools;
-  }
-
-  /**
-   * Clean the unused connections for this pool.
-   *
-   * @param pool Connection pool to cleanup.
-   */
-  @VisibleForTesting
-  void cleanup(ConnectionPool pool) {
-    if (pool.getNumConnections() > pool.getMinSize()) {
-      // Check if the pool hasn't been active in a while or not 50% are used
-      long timeSinceLastActive = Time.now() - pool.getLastActiveTime();
-      int total = pool.getNumConnections();
-      int active = pool.getNumActiveConnections();
-      if (timeSinceLastActive > connectionCleanupPeriodMs ||
-          active < MIN_ACTIVE_RATIO * total) {
-        // Remove and close 1 connection
-        List<ConnectionContext> conns = pool.removeConnections(1);
-        for (ConnectionContext conn : conns) {
-          conn.close();
-        }
-        LOG.debug("Removed connection {} used {} seconds ago. " +
-                "Pool has {}/{} connections", pool.getConnectionPoolId(),
-            TimeUnit.MILLISECONDS.toSeconds(timeSinceLastActive),
-            pool.getNumConnections(), pool.getMaxSize());
-      }
-    }
-  }
-
-  /**
-   * Removes stale connections not accessed recently from the pool. This is
-   * invoked periodically.
-   */
-  private class CleanupTask implements Runnable {
-
-    @Override
-    public void run() {
-      long currentTime = Time.now();
-      List<ConnectionPoolId> toRemove = new LinkedList<>();
-
-      // Look for stale pools
-      readLock.lock();
-      try {
-        for (Entry<ConnectionPoolId, ConnectionPool> entry : pools.entrySet()) {
-          ConnectionPool pool = entry.getValue();
-          long lastTimeActive = pool.getLastActiveTime();
-          boolean isStale =
-              currentTime > (lastTimeActive + poolCleanupPeriodMs);
-          if (lastTimeActive > 0 && isStale) {
-            // Remove this pool
-            LOG.debug("Closing and removing stale pool {}", pool);
-            pool.close();
-            ConnectionPoolId poolId = entry.getKey();
-            toRemove.add(poolId);
-          } else {
-            // Keep this pool but clean connections inside
-            LOG.debug("Cleaning up {}", pool);
-            cleanup(pool);
-          }
-        }
-      } finally {
-        readLock.unlock();
-      }
-
-      // Remove stale pools
-      if (!toRemove.isEmpty()) {
-        writeLock.lock();
-        try {
-          for (ConnectionPoolId poolId : toRemove) {
-            pools.remove(poolId);
-          }
-        } finally {
-          writeLock.unlock();
-        }
-      }
-    }
-  }
-
-  /**
-   * Thread that creates connections asynchronously.
-   */
-  private static class ConnectionCreator extends Thread {
-    /** If the creator is running. */
-    private boolean running = true;
-    /** Queue to push work to. */
-    private BlockingQueue<ConnectionPool> queue;
-
-    ConnectionCreator(BlockingQueue<ConnectionPool> blockingQueue) {
-      super("Connection creator");
-      this.queue = blockingQueue;
-    }
-
-    @Override
-    public void run() {
-      while (this.running) {
-        try {
-          ConnectionPool pool = this.queue.take();
-          try {
-            int total = pool.getNumConnections();
-            int active = pool.getNumActiveConnections();
-            if (pool.getNumConnections() < pool.getMaxSize() &&
-                active >= MIN_ACTIVE_RATIO * total) {
-              ConnectionContext conn = pool.newConnection();
-              pool.addConnection(conn);
-            } else {
-              LOG.debug("Cannot add more than {} connections to {}",
-                  pool.getMaxSize(), pool);
-            }
-          } catch (IOException e) {
-            LOG.error("Cannot create a new connection", e);
-          }
-        } catch (InterruptedException e) {
-          LOG.error("The connection creator was interrupted");
-          this.running = false;
-        }
-      }
-    }
-
-    /**
-     * Stop this connection creator.
-     */
-    public void shutdown() {
-      this.running = false;
-      this.interrupt();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
deleted file mode 100644
index 5af8a86..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.router;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.net.SocketFactory;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
-import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryUtils;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SaslRpcServer;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Time;
-import org.eclipse.jetty.util.ajax.JSON;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Maintains a pool of connections for each User (including tokens) + NN. The
- * RPC client maintains a single socket, to achieve throughput similar to a NN,
- * each request is multiplexed across multiple sockets/connections from a
- * pool.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class ConnectionPool {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ConnectionPool.class);
-
-
-  /** Configuration settings for the connection pool. */
-  private final Configuration conf;
-
-  /** Identifier for this connection pool. */
-  private final ConnectionPoolId connectionPoolId;
-  /** Namenode this pool connects to. */
-  private final String namenodeAddress;
-  /** User for this connections. */
-  private final UserGroupInformation ugi;
-
-  /** Pool of connections. We mimic a COW array. */
-  private volatile List<ConnectionContext> connections = new ArrayList<>();
-  /** Connection index for round-robin. */
-  private final AtomicInteger clientIndex = new AtomicInteger(0);
-
-  /** Min number of connections per user. */
-  private final int minSize;
-  /** Max number of connections per user. */
-  private final int maxSize;
-
-  /** The last time a connection was active. */
-  private volatile long lastActiveTime = 0;
-
-
-  protected ConnectionPool(Configuration config, String address,
-      UserGroupInformation user, int minPoolSize, int maxPoolSize)
-          throws IOException {
-
-    this.conf = config;
-
-    // Connection pool target
-    this.ugi = user;
-    this.namenodeAddress = address;
-    this.connectionPoolId =
-        new ConnectionPoolId(this.ugi, this.namenodeAddress);
-
-    // Set configuration parameters for the pool
-    this.minSize = minPoolSize;
-    this.maxSize = maxPoolSize;
-
-    // Add minimum connections to the pool
-    for (int i=0; i<this.minSize; i++) {
-      ConnectionContext newConnection = newConnection();
-      this.connections.add(newConnection);
-    }
-    LOG.debug("Created connection pool \"{}\" with {} connections",
-        this.connectionPoolId, this.minSize);
-  }
-
-  /**
-   * Get the maximum number of connections allowed in this pool.
-   *
-   * @return Maximum number of connections.
-   */
-  protected int getMaxSize() {
-    return this.maxSize;
-  }
-
-  /**
-   * Get the minimum number of connections in this pool.
-   *
-   * @return Minimum number of connections.
-   */
-  protected int getMinSize() {
-    return this.minSize;
-  }
-
-  /**
-   * Get the connection pool identifier.
-   *
-   * @return Connection pool identifier.
-   */
-  protected ConnectionPoolId getConnectionPoolId() {
-    return this.connectionPoolId;
-  }
-
-  /**
-   * Return the next connection round-robin.
-   *
-   * @return Connection context.
-   */
-  protected ConnectionContext getConnection() {
-
-    this.lastActiveTime = Time.now();
-
-    // Get a connection from the pool following round-robin
-    ConnectionContext conn = null;
-    List<ConnectionContext> tmpConnections = this.connections;
-    int size = tmpConnections.size();
-    int threadIndex = this.clientIndex.getAndIncrement();
-    for (int i=0; i<size; i++) {
-      int index = (threadIndex + i) % size;
-      conn = tmpConnections.get(index);
-      if (conn != null && conn.isUsable()) {
-        return conn;
-      }
-    }
-
-    // We return a connection even if it's active
-    return conn;
-  }
-
-  /**
-   * Add a connection to the current pool. It uses a Copy-On-Write approach.
-   *
-   * @param conn New connection to add to the pool.
-   */
-  public synchronized void addConnection(ConnectionContext conn) {
-    List<ConnectionContext> tmpConnections = new ArrayList<>(this.connections);
-    tmpConnections.add(conn);
-    this.connections = tmpConnections;
-
-    this.lastActiveTime = Time.now();
-  }
-
-  /**
-   * Remove connections from the current pool.
-   *
-   * @param num Number of connections to remove.
-   * @return Removed connections.
-   */
-  public synchronized List<ConnectionContext> removeConnections(int num) {
-    List<ConnectionContext> removed = new LinkedList<>();
-
-    // Remove and close the last connection
-    List<ConnectionContext> tmpConnections = new ArrayList<>();
-    for (int i=0; i<this.connections.size(); i++) {
-      ConnectionContext conn = this.connections.get(i);
-      if (i < this.minSize || i < this.connections.size() - num) {
-        tmpConnections.add(conn);
-      } else {
-        removed.add(conn);
-      }
-    }
-    this.connections = tmpConnections;
-
-    return removed;
-  }
-
-  /**
-   * Close the connection pool.
-   */
-  protected synchronized void close() {
-    long timeSinceLastActive = TimeUnit.MILLISECONDS.toSeconds(
-        Time.now() - getLastActiveTime());
-    LOG.debug("Shutting down connection pool \"{}\" used {} seconds ago",
-        this.connectionPoolId, timeSinceLastActive);
-
-    for (ConnectionContext connection : this.connections) {
-      connection.close();
-    }
-    this.connections.clear();
-  }
-
-  /**
-   * Number of connections in the pool.
-   *
-   * @return Number of connections.
-   */
-  protected int getNumConnections() {
-    return this.connections.size();
-  }
-
-  /**
-   * Number of active connections in the pool.
-   *
-   * @return Number of active connections.
-   */
-  protected int getNumActiveConnections() {
-    int ret = 0;
-
-    List<ConnectionContext> tmpConnections = this.connections;
-    for (ConnectionContext conn : tmpConnections) {
-      if (conn.isActive()) {
-        ret++;
-      }
-    }
-    return ret;
-  }
-
-  /**
-   * Get the last time the connection pool was used.
-   *
-   * @return Last time the connection pool was used.
-   */
-  protected long getLastActiveTime() {
-    return this.lastActiveTime;
-  }
-
-  @Override
-  public String toString() {
-    return this.connectionPoolId.toString();
-  }
-
-  /**
-   * JSON representation of the connection pool.
-   *
-   * @return String representation of the JSON.
-   */
-  public String getJSON() {
-    final Map<String, String> info = new LinkedHashMap<>();
-    info.put("active", Integer.toString(getNumActiveConnections()));
-    info.put("total", Integer.toString(getNumConnections()));
-    if (LOG.isDebugEnabled()) {
-      List<ConnectionContext> tmpConnections = this.connections;
-      for (int i=0; i<tmpConnections.size(); i++) {
-        ConnectionContext connection = tmpConnections.get(i);
-        info.put(i + " active", Boolean.toString(connection.isActive()));
-        info.put(i + " closed", Boolean.toString(connection.isClosed()));
-      }
-    }
-    return JSON.toString(info);
-  }
-
-  /**
-   * Create a new proxy wrapper for a client NN connection.
-   * @return Proxy for the target ClientProtocol that contains the user's
-   *         security context.
-   * @throws IOException
-   */
-  public ConnectionContext newConnection() throws IOException {
-    return newConnection(this.conf, this.namenodeAddress, this.ugi);
-  }
-
-  /**
-   * Creates a proxy wrapper for a client NN connection. Each proxy contains
-   * context for a single user/security context. To maximize throughput it is
-   * recommended to use multiple connection per user+server, allowing multiple
-   * writes and reads to be dispatched in parallel.
-   *
-   * @param conf Configuration for the connection.
-   * @param nnAddress Address of server supporting the ClientProtocol.
-   * @param ugi User context.
-   * @return Proxy for the target ClientProtocol that contains the user's
-   *         security context.
-   * @throws IOException If it cannot be created.
-   */
-  protected static ConnectionContext newConnection(Configuration conf,
-      String nnAddress, UserGroupInformation ugi)
-          throws IOException {
-    RPC.setProtocolEngine(
-        conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
-
-    final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(
-        conf,
-        HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
-        HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
-        HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
-        HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
-        HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME);
-
-    SocketFactory factory = SocketFactory.getDefault();
-    if (UserGroupInformation.isSecurityEnabled()) {
-      SaslRpcServer.init(conf);
-    }
-    InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress);
-    final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
-    ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
-        ClientNamenodeProtocolPB.class, version, socket, ugi, conf,
-        factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
-    ClientProtocol client = new ClientNamenodeProtocolTranslatorPB(proxy);
-    Text dtService = SecurityUtil.buildTokenService(socket);
-
-    ProxyAndInfo<ClientProtocol> clientProxy =
-        new ProxyAndInfo<ClientProtocol>(client, dtService, socket);
-    ConnectionContext connection = new ConnectionContext(clientProxy);
-    return connection;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
deleted file mode 100644
index 6e1ee9a..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.router;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-
-/**
- * Identifier for a connection for a user to a namenode.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class ConnectionPoolId implements Comparable<ConnectionPoolId> {
-
-  /** Namenode identifier. */
-  private final String nnId;
-  /** Information about the user. */
-  private final UserGroupInformation ugi;
-
-  /**
-   * New connection pool identifier.
-   *
-   * @param ugi Information of the user issuing the request.
-   * @param nnId Namenode address with port.
-   */
-  public ConnectionPoolId(final UserGroupInformation ugi, final String nnId) {
-    this.nnId = nnId;
-    this.ugi = ugi;
-  }
-
-  @Override
-  public int hashCode() {
-    int hash = new HashCodeBuilder(17, 31)
-        .append(this.nnId)
-        .append(this.ugi.toString())
-        .append(this.getTokenIds())
-        .toHashCode();
-    return hash;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o instanceof ConnectionPoolId) {
-      ConnectionPoolId other = (ConnectionPoolId) o;
-      if (!this.nnId.equals(other.nnId)) {
-        return false;
-      }
-      if (!this.ugi.toString().equals(other.ugi.toString())) {
-        return false;
-      }
-      String thisTokens = this.getTokenIds().toString();
-      String otherTokens = other.getTokenIds().toString();
-      return thisTokens.equals(otherTokens);
-    }
-    return false;
-  }
-
-  @Override
-  public String toString() {
-    return this.ugi + " " + this.getTokenIds() + "->" + this.nnId;
-  }
-
-  @Override
-  public int compareTo(ConnectionPoolId other) {
-    int ret = this.nnId.compareTo(other.nnId);
-    if (ret == 0) {
-      ret = this.ugi.toString().compareTo(other.ugi.toString());
-    }
-    if (ret == 0) {
-      String thisTokens = this.getTokenIds().toString();
-      String otherTokens = other.getTokenIds().toString();
-      ret = thisTokens.compareTo(otherTokens);
-    }
-    return ret;
-  }
-
-  @VisibleForTesting
-  UserGroupInformation getUgi() {
-    return this.ugi;
-  }
-
-  /**
-   * Get the token identifiers for this connection.
-   * @return List with the token identifiers.
-   */
-  private List<String> getTokenIds() {
-    List<String> tokenIds = new ArrayList<>();
-    Collection<Token<? extends TokenIdentifier>> tokens = this.ugi.getTokens();
-    for (Token<? extends TokenIdentifier> token : tokens) {
-      byte[] tokenIdBytes = token.getIdentifier();
-      String tokenId = Arrays.toString(tokenIdBytes);
-      tokenIds.add(tokenId);
-    }
-    Collections.sort(tokenIds);
-    return tokenIds;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/DFSRouter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/DFSRouter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/DFSRouter.java
deleted file mode 100644
index a2ac258..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/DFSRouter.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.router;
-
-import static org.apache.hadoop.util.ExitUtil.terminate;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.service.CompositeService.CompositeServiceShutdownHook;
-import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Tool to start the {@link Router} for Router-based federation.
- */
-public final class DFSRouter {
-
-  private static final Logger LOG = LoggerFactory.getLogger(DFSRouter.class);
-
-
-  /** Usage string for help message. */
-  private static final String USAGE = "Usage: hdfs dfsrouter";
-
-  /** Priority of the Router shutdown hook. */
-  public static final int SHUTDOWN_HOOK_PRIORITY = 30;
-
-
-  private DFSRouter() {
-    // This is just a class to trigger the Router
-  }
-
-  /**
-   * Main run loop for the router.
-   *
-   * @param argv parameters.
-   */
-  public static void main(String[] argv) {
-    if (DFSUtil.parseHelpArgument(argv, USAGE, System.out, true)) {
-      System.exit(0);
-    }
-
-    try {
-      StringUtils.startupShutdownMessage(Router.class, argv, LOG);
-
-      Router router = new Router();
-
-      ShutdownHookManager.get().addShutdownHook(
-          new CompositeServiceShutdownHook(router), SHUTDOWN_HOOK_PRIORITY);
-
-      Configuration conf = new HdfsConfiguration();
-      router.init(conf);
-      router.start();
-    } catch (Throwable e) {
-      LOG.error("Failed to start router", e);
-      terminate(1, e);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java
deleted file mode 100644
index d2b2d50..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.router;
-
-import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
-import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
-import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
-import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
-import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
-import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
-
-/**
- * Module that implements all the RPC calls in
- * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} related to
- * Erasure Coding in the {@link RouterRpcServer}.
- */
-public class ErasureCoding {
-
-  /** RPC server to receive client calls. */
-  private final RouterRpcServer rpcServer;
-  /** RPC clients to connect to the Namenodes. */
-  private final RouterRpcClient rpcClient;
-  /** Interface to identify the active NN for a nameservice or blockpool ID. */
-  private final ActiveNamenodeResolver namenodeResolver;
-
-
-  public ErasureCoding(RouterRpcServer server) {
-    this.rpcServer = server;
-    this.rpcClient =  this.rpcServer.getRPCClient();
-    this.namenodeResolver = this.rpcClient.getNamenodeResolver();
-  }
-
-  public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
-      throws IOException {
-    rpcServer.checkOperation(OperationCategory.READ);
-
-    RemoteMethod method = new RemoteMethod("getErasureCodingPolicies");
-    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    Map<FederationNamespaceInfo, ErasureCodingPolicyInfo[]> ret =
-        rpcClient.invokeConcurrent(
-            nss, method, true, false, ErasureCodingPolicyInfo[].class);
-    return merge(ret, ErasureCodingPolicyInfo.class);
-  }
-
-  public Map<String, String> getErasureCodingCodecs() throws IOException {
-    rpcServer.checkOperation(OperationCategory.READ);
-
-    RemoteMethod method = new RemoteMethod("getErasureCodingCodecs");
-    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    @SuppressWarnings("rawtypes")
-    Map<FederationNamespaceInfo, Map> retCodecs =
-        rpcClient.invokeConcurrent(
-            nss, method, true, false, Map.class);
-
-    Map<String, String> ret = new HashMap<>();
-    Object obj = retCodecs;
-    @SuppressWarnings("unchecked")
-    Map<FederationNamespaceInfo, Map<String, String>> results =
-        (Map<FederationNamespaceInfo, Map<String, String>>)obj;
-    Collection<Map<String, String>> allCodecs = results.values();
-    for (Map<String, String> codecs : allCodecs) {
-      ret.putAll(codecs);
-    }
-
-    return ret;
-  }
-
-  public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
-      ErasureCodingPolicy[] policies) throws IOException {
-    rpcServer.checkOperation(OperationCategory.WRITE);
-
-    RemoteMethod method = new RemoteMethod("addErasureCodingPolicies",
-        new Class<?>[] {ErasureCodingPolicy[].class}, new Object[] {policies});
-    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    Map<FederationNamespaceInfo, AddErasureCodingPolicyResponse[]> ret =
-        rpcClient.invokeConcurrent(
-            nss, method, true, false, AddErasureCodingPolicyResponse[].class);
-
-    return merge(ret, AddErasureCodingPolicyResponse.class);
-  }
-
-  public void removeErasureCodingPolicy(String ecPolicyName)
-      throws IOException {
-    rpcServer.checkOperation(OperationCategory.WRITE);
-
-    RemoteMethod method = new RemoteMethod("removeErasureCodingPolicy",
-        new Class<?>[] {String.class}, ecPolicyName);
-    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    rpcClient.invokeConcurrent(nss, method, true, false);
-  }
-
-  public void disableErasureCodingPolicy(String ecPolicyName)
-      throws IOException {
-    rpcServer.checkOperation(OperationCategory.WRITE);
-
-    RemoteMethod method = new RemoteMethod("disableErasureCodingPolicy",
-        new Class<?>[] {String.class}, ecPolicyName);
-    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    rpcClient.invokeConcurrent(nss, method, true, false);
-  }
-
-  public void enableErasureCodingPolicy(String ecPolicyName)
-      throws IOException {
-    rpcServer.checkOperation(OperationCategory.WRITE);
-
-    RemoteMethod method = new RemoteMethod("enableErasureCodingPolicy",
-        new Class<?>[] {String.class}, ecPolicyName);
-    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    rpcClient.invokeConcurrent(nss, method, true, false);
-  }
-
-  public ErasureCodingPolicy getErasureCodingPolicy(String src)
-      throws IOException {
-    rpcServer.checkOperation(OperationCategory.READ);
-
-    final List<RemoteLocation> locations =
-        rpcServer.getLocationsForPath(src, true);
-    RemoteMethod remoteMethod = new RemoteMethod("getErasureCodingPolicy",
-        new Class<?>[] {String.class}, new RemoteParam());
-    ErasureCodingPolicy ret = rpcClient.invokeSequential(
-        locations, remoteMethod, null, null);
-    return ret;
-  }
-
-  public void setErasureCodingPolicy(String src, String ecPolicyName)
-      throws IOException {
-    rpcServer.checkOperation(OperationCategory.WRITE);
-
-    final List<RemoteLocation> locations =
-        rpcServer.getLocationsForPath(src, true);
-    RemoteMethod remoteMethod = new RemoteMethod("setErasureCodingPolicy",
-        new Class<?>[] {String.class, String.class},
-        new RemoteParam(), ecPolicyName);
-    rpcClient.invokeSequential(locations, remoteMethod, null, null);
-  }
-
-  public void unsetErasureCodingPolicy(String src) throws IOException {
-    rpcServer.checkOperation(OperationCategory.WRITE);
-
-    final List<RemoteLocation> locations =
-        rpcServer.getLocationsForPath(src, true);
-    RemoteMethod remoteMethod = new RemoteMethod("unsetErasureCodingPolicy",
-        new Class<?>[] {String.class}, new RemoteParam());
-    rpcClient.invokeSequential(locations, remoteMethod, null, null);
-  }
-
-  public ECBlockGroupStats getECBlockGroupStats() throws IOException {
-    rpcServer.checkOperation(OperationCategory.READ);
-
-    RemoteMethod method = new RemoteMethod("getECBlockGroupStats");
-    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    Map<FederationNamespaceInfo, ECBlockGroupStats> allStats =
-        rpcClient.invokeConcurrent(
-            nss, method, true, false, ECBlockGroupStats.class);
-
-    // Merge the stats from all the namespaces
-    long lowRedundancyBlockGroups = 0;
-    long corruptBlockGroups = 0;
-    long missingBlockGroups = 0;
-    long bytesInFutureBlockGroups = 0;
-    long pendingDeletionBlocks = 0;
-    for (ECBlockGroupStats stats : allStats.values()) {
-      lowRedundancyBlockGroups += stats.getLowRedundancyBlockGroups();
-      corruptBlockGroups += stats.getCorruptBlockGroups();
-      missingBlockGroups += stats.getMissingBlockGroups();
-      bytesInFutureBlockGroups += stats.getBytesInFutureBlockGroups();
-      pendingDeletionBlocks += stats.getPendingDeletionBlocks();
-    }
-    return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups,
-        missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
deleted file mode 100644
index 3dfd998..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.router;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.lang.reflect.Constructor;
-import java.net.URL;
-import java.net.URLConnection;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
-import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
-import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
-import org.apache.hadoop.util.VersionInfo;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utilities for managing HDFS federation.
- */
-public final class FederationUtil {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(FederationUtil.class);
-
-  private FederationUtil() {
-    // Utility Class
-  }
-
-  /**
-   * Get a JMX data from a web endpoint.
-   *
-   * @param beanQuery JMX bean.
-   * @param webAddress Web address of the JMX endpoint.
-   * @return JSON with the JMX data
-   */
-  public static JSONArray getJmx(String beanQuery, String webAddress) {
-    JSONArray ret = null;
-    BufferedReader reader = null;
-    try {
-      String host = webAddress;
-      int port = -1;
-      if (webAddress.indexOf(":") > 0) {
-        String[] webAddressSplit = webAddress.split(":");
-        host = webAddressSplit[0];
-        port = Integer.parseInt(webAddressSplit[1]);
-      }
-      URL jmxURL = new URL("http", host, port, "/jmx?qry=" + beanQuery);
-      URLConnection conn = jmxURL.openConnection();
-      conn.setConnectTimeout(5 * 1000);
-      conn.setReadTimeout(5 * 1000);
-      InputStream in = conn.getInputStream();
-      InputStreamReader isr = new InputStreamReader(in, "UTF-8");
-      reader = new BufferedReader(isr);
-
-      StringBuilder sb = new StringBuilder();
-      String line = null;
-      while ((line = reader.readLine()) != null) {
-        sb.append(line);
-      }
-      String jmxOutput = sb.toString();
-
-      // Parse JSON
-      JSONObject json = new JSONObject(jmxOutput);
-      ret = json.getJSONArray("beans");
-    } catch (IOException e) {
-      LOG.error("Cannot read JMX bean {} from server {}: {}",
-          beanQuery, webAddress, e.getMessage());
-    } catch (JSONException e) {
-      LOG.error("Cannot parse JMX output for {} from server {}: {}",
-          beanQuery, webAddress, e.getMessage());
-    } catch (Exception e) {
-      LOG.error("Cannot parse JMX output for {} from server {}: {}",
-          beanQuery, webAddress, e);
-    } finally {
-      if (reader != null) {
-        try {
-          reader.close();
-        } catch (IOException e) {
-          LOG.error("Problem closing {}", webAddress, e);
-        }
-      }
-    }
-    return ret;
-  }
-
-  /**
-   * Fetch the Hadoop version string for this jar.
-   *
-   * @return Hadoop version string, e.g., 3.0.1.
-   */
-  public static String getVersion() {
-    return VersionInfo.getVersion();
-  }
-
-  /**
-   * Fetch the build/compile information for this jar.
-   *
-   * @return String Compilation info.
-   */
-  public static String getCompileInfo() {
-    return VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from "
-        + VersionInfo.getBranch();
-  }
-
-  /**
-   * Create an instance of an interface with a constructor using a context.
-   *
-   * @param conf Configuration for the class names.
-   * @param context Context object to pass to the instance.
-   * @param contextClass Type of the context passed to the constructor.
-   * @param clazz Class of the object to return.
-   * @return New instance of the specified class that implements the desired
-   *         interface and a single parameter constructor containing a
-   *         StateStore reference.
-   */
-  private static <T, R> T newInstance(final Configuration conf,
-      final R context, final Class<R> contextClass, final Class<T> clazz) {
-    try {
-      if (contextClass == null) {
-        // Default constructor if no context
-        Constructor<T> constructor = clazz.getConstructor();
-        return constructor.newInstance();
-      } else {
-        // Constructor with context
-        Constructor<T> constructor = clazz.getConstructor(
-            Configuration.class, contextClass);
-        return constructor.newInstance(conf, context);
-      }
-    } catch (ReflectiveOperationException e) {
-      LOG.error("Could not instantiate: {}", clazz.getSimpleName(), e);
-      return null;
-    }
-  }
-
-  /**
-   * Creates an instance of a FileSubclusterResolver from the configuration.
-   *
-   * @param conf Configuration that defines the file resolver class.
-   * @param router Router service.
-   * @return New file subcluster resolver.
-   */
-  public static FileSubclusterResolver newFileSubclusterResolver(
-      Configuration conf, Router router) {
-    Class<? extends FileSubclusterResolver> clazz = conf.getClass(
-        DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
-        DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT,
-        FileSubclusterResolver.class);
-    return newInstance(conf, router, Router.class, clazz);
-  }
-
-  /**
-   * Creates an instance of an ActiveNamenodeResolver from the configuration.
-   *
-   * @param conf Configuration that defines the namenode resolver class.
-   * @param stateStore State store passed to class constructor.
-   * @return New active namenode resolver.
-   */
-  public static ActiveNamenodeResolver newActiveNamenodeResolver(
-      Configuration conf, StateStoreService stateStore) {
-    Class<? extends ActiveNamenodeResolver> clazz = conf.getClass(
-        DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
-        DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT,
-        ActiveNamenodeResolver.class);
-    return newInstance(conf, stateStore, StateStoreService.class, clazz);
-  }
-
-  /**
-   * Check if the given path is the child of parent path.
-   * @param path Path to be check.
-   * @param parent Parent path.
-   * @return True if parent path is parent entry for given path.
-   */
-  public static boolean isParentEntry(final String path, final String parent) {
-    if (!path.startsWith(parent)) {
-      return false;
-    }
-
-    if (path.equals(parent)) {
-      return true;
-    }
-
-    return path.charAt(parent.length()) == Path.SEPARATOR_CHAR
-        || parent.equals(Path.SEPARATOR);
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message