hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject svn commit: r1566530 [2/5] - in /hbase/branches/hbase-10070: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/test/java/org/apache/hadoop/hbase/client/ hbase-server/src/main/java/org/apache/hadoop/hbase/client/ hbase-server/s...
Date Mon, 10 Feb 2014 08:55:38 GMT
Added: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java?rev=1566530&view=auto
==============================================================================
--- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java (added)
+++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java Mon Feb 10 08:55:37 2014
@@ -0,0 +1,2564 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
+import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
+import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.*;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * An internal, A non-instantiable class that manages creation of {@link HConnection}s.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Private
+// NOTE: DO NOT make this class public. It was made package-private on purpose.
+class ConnectionManager {
+  static final Log LOG = LogFactory.getLog(ConnectionManager.class);
+
+  public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
+  private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled";
+
+  // An LRU Map of HConnectionKey -> HConnection (TableServer).  All
+  // access must be synchronized.  This map is not private because tests
+  // need to be able to tinker with it.
+  static final Map<HConnectionKey, HConnectionImplementation> CONNECTION_INSTANCES;
+
+  public static final int MAX_CACHED_CONNECTION_INSTANCES;
+
+  /**
+   * Global nonceGenerator shared per client.Currently there's no reason to limit its scope.
+   * Once it's set under nonceGeneratorCreateLock, it is never unset or changed.
+   */
+  private static volatile NonceGenerator nonceGenerator = null;
+  /** The nonce generator lock. Only taken when creating HConnection, which gets a private copy. */
+  private static Object nonceGeneratorCreateLock = new Object();
+
+  static {
+    // We set instances to one more than the value specified for {@link
+    // HConstants#ZOOKEEPER_MAX_CLIENT_CNXNS}. By default, the zk default max
+    // connections to the ensemble from the one client is 30, so in that case we
+    // should run into zk issues before the LRU hit this value of 31.
+    MAX_CACHED_CONNECTION_INSTANCES = HBaseConfiguration.create().getInt(
+      HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
+    CONNECTION_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
+        (int) (MAX_CACHED_CONNECTION_INSTANCES / 0.75F) + 1, 0.75F, true) {
+      @Override
+      protected boolean removeEldestEntry(
+          Map.Entry<HConnectionKey, HConnectionImplementation> eldest) {
+         return size() > MAX_CACHED_CONNECTION_INSTANCES;
+       }
+    };
+  }
+
+  /** Dummy nonce generator for disabled nonces. */
+  static class NoNonceGenerator implements NonceGenerator {
+    @Override
+    public long getNonceGroup() {
+      return HConstants.NO_NONCE;
+    }
+    @Override
+    public long newNonce() {
+      return HConstants.NO_NONCE;
+    }
+  }
+
+  /*
+   * Non-instantiable.
+   */
+  private ConnectionManager() {
+    super();
+  }
+
+  /**
+   * @param conn The connection for which to replace the generator.
+   * @param cnm Replaces the nonce generator used, for testing.
+   * @return old nonce generator.
+   */
+  @VisibleForTesting
+  static NonceGenerator injectNonceGeneratorForTesting(
+      HConnection conn, NonceGenerator cnm) {
+    HConnectionImplementation connImpl = (HConnectionImplementation)conn;
+    NonceGenerator ng = connImpl.getNonceGenerator();
+    LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName());
+    connImpl.nonceGenerator = cnm;
+    return ng;
+  }
+
+  /**
+   * Get the connection that goes with the passed <code>conf</code> configuration instance.
+   * If no current connection exists, method creates a new connection and keys it using
+   * connection-specific properties from the passed {@link Configuration}; see
+   * {@link HConnectionKey}.
+   * @param conf configuration
+   * @return HConnection object for <code>conf</code>
+   * @throws ZooKeeperConnectionException
+   */
+  @Deprecated
+  public static HConnection getConnection(final Configuration conf) throws IOException {
+    return getConnectionInternal(conf);
+  }
+
+
+  static ClusterConnection getConnectionInternal(final Configuration conf)
+    throws IOException {
+    HConnectionKey connectionKey = new HConnectionKey(conf);
+    synchronized (CONNECTION_INSTANCES) {
+      HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
+      if (connection == null) {
+        connection = (HConnectionImplementation)createConnection(conf, true);
+        CONNECTION_INSTANCES.put(connectionKey, connection);
+      } else if (connection.isClosed()) {
+        ConnectionManager.deleteConnection(connectionKey, true);
+        connection = (HConnectionImplementation)createConnection(conf, true);
+        CONNECTION_INSTANCES.put(connectionKey, connection);
+      }
+      connection.incCount();
+      return connection;
+    }
+  }
+
+  /**
+   * Create a new HConnection instance using the passed <code>conf</code> instance.
+   * <p>Note: This bypasses the usual HConnection life cycle management done by
+   * {@link #getConnection(Configuration)}. The caller is responsible for
+   * calling {@link HConnection#close()} on the returned connection instance.
+   *
+   * This is the recommended way to create HConnections.
+   * {@code
+   * HConnection connection = ConnectionManagerInternal.createConnection(conf);
+   * HTableInterface table = connection.getTable("mytable");
+   * table.get(...);
+   * ...
+   * table.close();
+   * connection.close();
+   * }
+   *
+   * @param conf configuration
+   * @return HConnection object for <code>conf</code>
+   * @throws ZooKeeperConnectionException
+   */
+  public static HConnection createConnection(Configuration conf) throws IOException {
+    return createConnectionInternal(conf);
+  }
+
+  static ClusterConnection createConnectionInternal(Configuration conf) throws IOException {
+    UserProvider provider = UserProvider.instantiate(conf);
+    return createConnection(conf, false, null, provider.getCurrent());
+  }
+
+  /**
+   * Create a new HConnection instance using the passed <code>conf</code> instance.
+   * <p>Note: This bypasses the usual HConnection life cycle management done by
+   * {@link #getConnection(Configuration)}. The caller is responsible for
+   * calling {@link HConnection#close()} on the returned connection instance.
+   * This is the recommended way to create HConnections.
+   * {@code
+   * ExecutorService pool = ...;
+   * HConnection connection = HConnectionManager.createConnection(conf, pool);
+   * HTableInterface table = connection.getTable("mytable");
+   * table.get(...);
+   * ...
+   * table.close();
+   * connection.close();
+   * }
+   * @param conf configuration
+   * @param pool the thread pool to use for batch operation in HTables used via this HConnection
+   * @return HConnection object for <code>conf</code>
+   * @throws ZooKeeperConnectionException
+   */
+  public static HConnection createConnection(Configuration conf, ExecutorService pool)
+  throws IOException {
+    UserProvider provider = UserProvider.instantiate(conf);
+    return createConnection(conf, false, pool, provider.getCurrent());
+  }
+
+  /**
+   * Create a new HConnection instance using the passed <code>conf</code> instance.
+   * <p>Note: This bypasses the usual HConnection life cycle management done by
+   * {@link #getConnection(Configuration)}. The caller is responsible for
+   * calling {@link HConnection#close()} on the returned connection instance.
+   * This is the recommended way to create HConnections.
+   * {@code
+   * ExecutorService pool = ...;
+   * HConnection connection = HConnectionManager.createConnection(conf, pool);
+   * HTableInterface table = connection.getTable("mytable");
+   * table.get(...);
+   * ...
+   * table.close();
+   * connection.close();
+   * }
+   * @param conf configuration
+   * @param user the user the connection is for
+   * @return HConnection object for <code>conf</code>
+   * @throws ZooKeeperConnectionException
+   */
+  public static HConnection createConnection(Configuration conf, User user)
+  throws IOException {
+    return createConnection(conf, false, null, user);
+  }
+
+  /**
+   * Create a new HConnection instance using the passed <code>conf</code> instance.
+   * <p>Note: This bypasses the usual HConnection life cycle management done by
+   * {@link #getConnection(Configuration)}. The caller is responsible for
+   * calling {@link HConnection#close()} on the returned connection instance.
+   * This is the recommended way to create HConnections.
+   * {@code
+   * ExecutorService pool = ...;
+   * HConnection connection = HConnectionManager.createConnection(conf, pool);
+   * HTableInterface table = connection.getTable("mytable");
+   * table.get(...);
+   * ...
+   * table.close();
+   * connection.close();
+   * }
+   * @param conf configuration
+   * @param pool the thread pool to use for batch operation in HTables used via this HConnection
+   * @param user the user the connection is for
+   * @return HConnection object for <code>conf</code>
+   * @throws ZooKeeperConnectionException
+   */
+  public static HConnection createConnection(Configuration conf, ExecutorService pool, User user)
+  throws IOException {
+    return createConnection(conf, false, pool, user);
+  }
+
+  @Deprecated
+  static HConnection createConnection(final Configuration conf, final boolean managed)
+      throws IOException {
+    UserProvider provider = UserProvider.instantiate(conf);
+    return createConnection(conf, managed, null, provider.getCurrent());
+  }
+
+  @Deprecated
+  static ClusterConnection createConnection(final Configuration conf, final boolean managed,
+      final ExecutorService pool, final User user)
+  throws IOException {
+    String className = conf.get(HConnection.HBASE_CLIENT_CONNECTION_IMPL,
+      HConnectionImplementation.class.getName());
+    Class<?> clazz = null;
+    try {
+      clazz = Class.forName(className);
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+    try {
+      // Default HCM#HCI is not accessible; make it so before invoking.
+      Constructor<?> constructor =
+        clazz.getDeclaredConstructor(Configuration.class,
+          boolean.class, ExecutorService.class, User.class);
+      constructor.setAccessible(true);
+      return (ClusterConnection) constructor.newInstance(conf, managed, pool, user);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Delete connection information for the instance specified by passed configuration.
+   * If there are no more references to the designated connection connection, this method will
+   * then close connection to the zookeeper ensemble and let go of all associated resources.
+   *
+   * @param conf configuration whose identity is used to find {@link HConnection} instance.
+   * @deprecated
+   */
+  public static void deleteConnection(Configuration conf) {
+    deleteConnection(new HConnectionKey(conf), false);
+  }
+
+  /**
+   * Cleanup a known stale connection.
+   * This will then close connection to the zookeeper ensemble and let go of all resources.
+   *
+   * @param connection
+   * @deprecated
+   */
+  public static void deleteStaleConnection(HConnection connection) {
+    deleteConnection(connection, true);
+  }
+
+  /**
+   * Delete information for all connections. Close or not the connection, depending on the
+   *  staleConnection boolean and the ref count. By default, you should use it with
+   *  staleConnection to true.
+   * @deprecated
+   */
+  public static void deleteAllConnections(boolean staleConnection) {
+    synchronized (CONNECTION_INSTANCES) {
+      Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
+      connectionKeys.addAll(CONNECTION_INSTANCES.keySet());
+      for (HConnectionKey connectionKey : connectionKeys) {
+        deleteConnection(connectionKey, staleConnection);
+      }
+      CONNECTION_INSTANCES.clear();
+    }
+  }
+
+  /**
+   * Delete information for all connections..
+   * @deprecated kept for backward compatibility, but the behavior is broken. HBASE-8983
+   */
+  @Deprecated
+  public static void deleteAllConnections() {
+    deleteAllConnections(false);
+  }
+
+
+  @Deprecated
+  private static void deleteConnection(HConnection connection, boolean staleConnection) {
+    synchronized (CONNECTION_INSTANCES) {
+      for (Entry<HConnectionKey, HConnectionImplementation> e: CONNECTION_INSTANCES.entrySet()) {
+        if (e.getValue() == connection) {
+          deleteConnection(e.getKey(), staleConnection);
+          break;
+        }
+      }
+    }
+  }
+
+  @Deprecated
+  private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
+    synchronized (CONNECTION_INSTANCES) {
+      HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
+      if (connection != null) {
+        connection.decCount();
+        if (connection.isZeroReference() || staleConnection) {
+          CONNECTION_INSTANCES.remove(connectionKey);
+          connection.internalClose();
+        }
+      } else {
+        LOG.error("Connection not found in the list, can't delete it "+
+          "(connection key=" + connectionKey + "). May be the key was modified?", new Exception());
+      }
+    }
+  }
+
+  /**
+   * It is provided for unit test cases which verify the behavior of region
+   * location cache prefetch.
+   * @return Number of cached regions for the table.
+   * @throws ZooKeeperConnectionException
+   */
+  static int getCachedRegionCount(Configuration conf, final TableName tableName)
+  throws IOException {
+    return execute(new HConnectable<Integer>(conf) {
+      @Override
+      public Integer connect(HConnection connection) {
+        return ((HConnectionImplementation)connection).getNumberOfCachedRegionLocations(tableName);
+      }
+    });
+  }
+
+  /**
+   * It's provided for unit test cases which verify the behavior of region
+   * location cache prefetch.
+   * @return true if the region where the table and row reside is cached.
+   * @throws ZooKeeperConnectionException
+   */
+  static boolean isRegionCached(Configuration conf,
+                                final TableName tableName,
+                                final byte[] row)
+  throws IOException {
+    return execute(new HConnectable<Boolean>(conf) {
+      @Override
+      public Boolean connect(HConnection connection) {
+        return ((HConnectionImplementation) connection).metaCache.isRegionCached(tableName, row);
+      }
+    });
+  }
+
+  /**
+   * This convenience method invokes the given {@link HConnectable#connect}
+   * implementation using a {@link HConnection} instance that lasts just for the
+   * duration of the invocation.
+   *
+   * @param <T> the return type of the connect method
+   * @param connectable the {@link HConnectable} instance
+   * @return the value returned by the connect method
+   * @throws IOException
+   */
+  @InterfaceAudience.Private
+  public static <T> T execute(HConnectable<T> connectable) throws IOException {
+    if (connectable == null || connectable.conf == null) {
+      return null;
+    }
+    Configuration conf = connectable.conf;
+    HConnection connection = getConnection(conf);
+    boolean connectSucceeded = false;
+    try {
+      T returnValue = connectable.connect(connection);
+      connectSucceeded = true;
+      return returnValue;
+    } finally {
+      try {
+        connection.close();
+      } catch (Exception e) {
+        ExceptionUtil.rethrowIfInterrupt(e);
+        if (connectSucceeded) {
+          throw new IOException("The connection to " + connection
+              + " could not be deleted.", e);
+        }
+      }
+    }
+  }
+
+  /** Encapsulates connection to zookeeper and regionservers.*/
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+      value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
+      justification="Access to the conncurrent hash map is under a lock so should be fine.")
+  static class HConnectionImplementation implements ClusterConnection, Closeable {
+    static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
+    private final long pause;
+    private final int numTries;
+    final int rpcTimeout;
+    private NonceGenerator nonceGenerator = null;
+    private final int prefetchRegionLimit;
+    private final AsyncProcess asyncProcess;
+
+    private volatile boolean closed;
+    private volatile boolean aborted;
+
+    // package protected for the tests
+    ClusterStatusListener clusterStatusListener;
+
+    private final Object userRegionLock = new Object();
+
+    // We have a single lock for master & zk to prevent deadlocks. Having
+    //  one lock for ZK and one lock for master is not possible:
+    //  When creating a connection to master, we need a connection to ZK to get
+    //  its address. But another thread could have taken the ZK lock, and could
+    //  be waiting for the master lock => deadlock.
+    private final Object masterAndZKLock = new Object();
+
+    private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
+    private final DelayedClosing delayedClosing =
+      DelayedClosing.createAndStart(this);
+
+    // thread executor shared by all HTableInterface instances created
+    // by this connection
+    private volatile ExecutorService batchPool = null;
+    private volatile boolean cleanupPool = false;
+
+    private final Configuration conf;
+
+    // Client rpc instance.
+    private RpcClient rpcClient;
+
+    private MetaCache metaCache = new MetaCache();
+
+    private int refCount;
+
+    // indicates whether this connection's life cycle is managed (by us)
+    private boolean managed;
+
+    private User user;
+
+    /**
+     * Cluster registry of basic info such as clusterid and meta region location.
+     */
+     Registry registry;
+
+     HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
+       this(conf, managed, null, null);
+     }
+
+    /**
+     * constructor
+     * @param conf Configuration object
+     * @param managed If true, does not do full shutdown on close; i.e. cleanup of connection
+     * to zk and shutdown of all services; we just close down the resources this connection was
+     * responsible for and decrement usage counters.  It is up to the caller to do the full
+     * cleanup.  It is set when we want have connection sharing going on -- reuse of zk connection,
+     * and cached region locations, established regionserver connections, etc.  When connections
+     * are shared, we have reference counting going on and will only do full cleanup when no more
+     * users of an HConnectionImplementation instance.
+     */
+    HConnectionImplementation(Configuration conf, boolean managed,
+        ExecutorService pool, User user) throws IOException {
+      this(conf);
+      this.user = user;
+      this.batchPool = pool;
+      this.managed = managed;
+      this.registry = setupRegistry();
+      retrieveClusterId();
+
+      this.rpcClient = new RpcClient(this.conf, this.clusterId);
+
+      // Do we publish the status?
+      boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
+          HConstants.STATUS_PUBLISHED_DEFAULT);
+      Class<? extends ClusterStatusListener.Listener> listenerClass =
+          conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
+              ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
+              ClusterStatusListener.Listener.class);
+      if (shouldListen) {
+        if (listenerClass == null) {
+          LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
+              ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
+        } else {
+          clusterStatusListener = new ClusterStatusListener(
+              new ClusterStatusListener.DeadServerHandler() {
+                @Override
+                public void newDead(ServerName sn) {
+                  clearCaches(sn);
+                  rpcClient.cancelConnections(sn.getHostname(), sn.getPort(),
+                      new SocketException(sn.getServerName() +
+                          " is dead: closing its connection."));
+                }
+              }, conf, listenerClass);
+        }
+      }
+    }
+
+    /**
+     * For tests.
+     */
+    protected HConnectionImplementation(Configuration conf) {
+      this.conf = conf;
+      this.closed = false;
+      this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
+          HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+      this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+          HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+      this.rpcTimeout = conf.getInt(
+          HConstants.HBASE_RPC_TIMEOUT_KEY,
+          HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+      if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
+        synchronized (nonceGeneratorCreateLock) {
+          if (ConnectionManager.nonceGenerator == null) {
+            ConnectionManager.nonceGenerator = new PerClientRandomNonceGenerator();
+          }
+          this.nonceGenerator = ConnectionManager.nonceGenerator;
+        }
+      } else {
+        this.nonceGenerator = new NoNonceGenerator();
+      }
+      this.asyncProcess = createAsyncProcess(this.conf);
+
+      this.prefetchRegionLimit = conf.getInt(
+          HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
+          HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
+    }
+
+    @Override
+    public HTableInterface getTable(String tableName) throws IOException {
+      return getTable(TableName.valueOf(tableName));
+    }
+
+    @Override
+    public HTableInterface getTable(byte[] tableName) throws IOException {
+      return getTable(TableName.valueOf(tableName));
+    }
+
+    @Override
+    public HTableInterface getTable(TableName tableName) throws IOException {
+      return getTable(tableName, getBatchPool());
+    }
+
+    @Override
+    public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
+      return getTable(TableName.valueOf(tableName), pool);
+    }
+
+    @Override
+    public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
+      return getTable(TableName.valueOf(tableName), pool);
+    }
+
+    @Override
+    public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
+      if (managed) {
+        throw new IOException("The connection has to be unmanaged.");
+      }
+      return new HTable(tableName, this, pool);
+    }
+
+    private ExecutorService getBatchPool() {
+      if (batchPool == null) {
+        // shared HTable thread executor not yet initialized
+        synchronized (this) {
+          if (batchPool == null) {
+            int maxThreads = conf.getInt("hbase.hconnection.threads.max", 256);
+            int coreThreads = conf.getInt("hbase.hconnection.threads.core", 256);
+            if (maxThreads == 0) {
+              maxThreads = Runtime.getRuntime().availableProcessors() * 8;
+            }
+            if (coreThreads == 0) {
+              coreThreads = Runtime.getRuntime().availableProcessors() * 8;
+            }
+            long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
+            LinkedBlockingQueue<Runnable> workQueue =
+              new LinkedBlockingQueue<Runnable>(maxThreads *
+                conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
+                  HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
+            ThreadPoolExecutor tpe = new ThreadPoolExecutor(
+                coreThreads,
+                maxThreads,
+                keepAliveTime,
+                TimeUnit.SECONDS,
+                workQueue,
+                Threads.newDaemonThreadFactory(toString() + "-shared-"));
+            tpe.allowCoreThreadTimeOut(true);
+            this.batchPool = tpe;
+          }
+          this.cleanupPool = true;
+        }
+      }
+      return this.batchPool;
+    }
+
+    protected ExecutorService getCurrentBatchPool() {
+      return batchPool;
+    }
+
+    private void shutdownBatchPool() {
+      if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
+        this.batchPool.shutdown();
+        try {
+          if (!this.batchPool.awaitTermination(10, TimeUnit.SECONDS)) {
+            this.batchPool.shutdownNow();
+          }
+        } catch (InterruptedException e) {
+          this.batchPool.shutdownNow();
+        }
+      }
+    }
+
+    /**
+     * @return The cluster registry implementation to use.
+     * @throws IOException
+     */
+    private Registry setupRegistry() throws IOException {
+      String registryClass = this.conf.get("hbase.client.registry.impl",
+        ZooKeeperRegistry.class.getName());
+      Registry registry = null;
+      try {
+        registry = (Registry)Class.forName(registryClass).newInstance();
+      } catch (Throwable t) {
+        throw new IOException(t);
+      }
+      registry.init(this);
+      return registry;
+    }
+
+    /**
+     * For tests only.
+     * @param rpcClient Client we should use instead.
+     * @return Previous rpcClient
+     */
+    RpcClient setRpcClient(final RpcClient rpcClient) {
+      RpcClient oldRpcClient = this.rpcClient;
+      this.rpcClient = rpcClient;
+      return oldRpcClient;
+    }
+
+    /**
+     * An identifier that will remain the same for a given connection.
+     * @return
+     */
+    @Override
+    public String toString(){
+      return "hconnection-0x" + Integer.toHexString(hashCode());
+    }
+
+    protected String clusterId = null;
+
+    void retrieveClusterId() {
+      if (clusterId != null) return;
+      this.clusterId = this.registry.getClusterId();
+      if (clusterId == null) {
+        clusterId = HConstants.CLUSTER_ID_DEFAULT;
+        LOG.debug("clusterid came back null, using default " + clusterId);
+      }
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return this.conf;
+    }
+
+    private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw)
+      throws MasterNotRunningException {
+      String errorMsg;
+      try {
+        if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) {
+          errorMsg = "The node " + zkw.baseZNode+" is not in ZooKeeper. "
+            + "It should have been written by the master. "
+            + "Check the value configured in 'zookeeper.znode.parent'. "
+            + "There could be a mismatch with the one configured in the master.";
+          LOG.error(errorMsg);
+          throw new MasterNotRunningException(errorMsg);
+        }
+      } catch (KeeperException e) {
+        errorMsg = "Can't get connection to ZooKeeper: " + e.getMessage();
+        LOG.error(errorMsg);
+        throw new MasterNotRunningException(errorMsg, e);
+      }
+    }
+
+    /**
+     * @return true if the master is running, throws an exception otherwise
+     * @throws MasterNotRunningException - if the master is not running
+     * @throws ZooKeeperConnectionException
+     */
+    @Override
+    public boolean isMasterRunning()
+    throws MasterNotRunningException, ZooKeeperConnectionException {
+      // When getting the master connection, we check it's running,
+      // so if there is no exception, it means we've been able to get a
+      // connection on a running master
+      MasterKeepAliveConnection m = getKeepAliveMasterService();
+      m.close();
+      return true;
+    }
+
+    @Override
+    public HRegionLocation getRegionLocation(final TableName tableName,
+        final byte [] row, boolean reload)
+    throws IOException {
+      return reload? relocateRegion(tableName, row): locateRegion(tableName, row);
+    }
+
+    @Override
+    public HRegionLocation getRegionLocation(final byte[] tableName,
+        final byte [] row, boolean reload)
+    throws IOException {
+      return getRegionLocation(TableName.valueOf(tableName), row, reload);
+    }
+
+    @Override
+    public boolean isTableEnabled(TableName tableName) throws IOException {
+      return this.registry.isTableOnlineState(tableName, true);
+    }
+
+    @Override
+    public boolean isTableEnabled(byte[] tableName) throws IOException {
+      return isTableEnabled(TableName.valueOf(tableName));
+    }
+
+    @Override
+    public boolean isTableDisabled(TableName tableName) throws IOException {
+      return this.registry.isTableOnlineState(tableName, false);
+    }
+
+    @Override
+    public boolean isTableDisabled(byte[] tableName) throws IOException {
+      return isTableDisabled(TableName.valueOf(tableName));
+    }
+
+    @Override
+    public boolean isTableAvailable(final TableName tableName) throws IOException {
+      final AtomicBoolean available = new AtomicBoolean(true);
+      final AtomicInteger regionCount = new AtomicInteger(0);
+      MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
+        @Override
+        public boolean processRow(Result row) throws IOException {
+          HRegionInfo info = MetaScanner.getHRegionInfo(row);
+          if (info != null && !info.isSplitParent()) {
+            if (tableName.equals(info.getTable())) {
+              ServerName server = HRegionInfo.getServerName(row);
+              if (server == null) {
+                available.set(false);
+                return false;
+              }
+              regionCount.incrementAndGet();
+            } else if (tableName.compareTo(info.getTable()) < 0) {
+              // Return if we are done with the current table
+              return false;
+            }
+          }
+          return true;
+        }
+      };
+      MetaScanner.metaScan(conf, this, visitor, tableName);
+      return available.get() && (regionCount.get() > 0);
+    }
+
+    @Override
+    public boolean isTableAvailable(final byte[] tableName) throws IOException {
+      return isTableAvailable(TableName.valueOf(tableName));
+    }
+
+    @Override
+    public boolean isTableAvailable(final TableName tableName, final byte[][] splitKeys)
+        throws IOException {
+      final AtomicBoolean available = new AtomicBoolean(true);
+      final AtomicInteger regionCount = new AtomicInteger(0);
+      MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
+        @Override
+        public boolean processRow(Result row) throws IOException {
+          HRegionInfo info = MetaScanner.getHRegionInfo(row);
+          if (info != null && !info.isSplitParent()) {
+            if (tableName.equals(info.getTable())) {
+              ServerName server = HRegionInfo.getServerName(row);
+              if (server == null) {
+                available.set(false);
+                return false;
+              }
+              if (!Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
+                for (byte[] splitKey : splitKeys) {
+                  // Just check if the splitkey is available
+                  if (Bytes.equals(info.getStartKey(), splitKey)) {
+                    regionCount.incrementAndGet();
+                    break;
+                  }
+                }
+              } else {
+                // Always empty start row should be counted
+                regionCount.incrementAndGet();
+              }
+            } else if (tableName.compareTo(info.getTable()) < 0) {
+              // Return if we are done with the current table
+              return false;
+            }
+          }
+          return true;
+        }
+      };
+      MetaScanner.metaScan(conf, this, visitor, tableName);
+      // +1 needs to be added so that the empty start row is also taken into account
+      return available.get() && (regionCount.get() == splitKeys.length + 1);
+    }
+
+    @Override
+    public boolean isTableAvailable(final byte[] tableName, final byte[][] splitKeys)
+        throws IOException {
+      return isTableAvailable(TableName.valueOf(tableName), splitKeys);
+    }
+
+    @Override
+    public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
+      RegionLocations locations = locateRegion(HRegionInfo.getTable(regionName),
+          HRegionInfo.getStartKey(regionName), false, true);
+      return locations == null ? null : locations.getRegionLocation();
+    }
+
+    @Override
+    public boolean isDeadServer(ServerName sn) {
+      if (clusterStatusListener == null) {
+        return false;
+      } else {
+        return clusterStatusListener.isDeadServer(sn);
+      }
+    }
+
+    @Override
+    public List<HRegionLocation> locateRegions(final TableName tableName)
+    throws IOException {
+      return locateRegions (tableName, false, true);
+    }
+
+    @Override
+    public List<HRegionLocation> locateRegions(final byte[] tableName)
+    throws IOException {
+      return locateRegions(TableName.valueOf(tableName));
+    }
+
+    @Override
+    public List<HRegionLocation> locateRegions(final TableName tableName,
+        final boolean useCache, final boolean offlined) throws IOException {
+      NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(conf, this,
+          tableName, offlined);
+      final List<HRegionLocation> locations = new ArrayList<HRegionLocation>();
+      for (HRegionInfo regionInfo : regions.keySet()) {
+        RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true);
+        if (list != null) {
+          for (HRegionLocation loc : list.getRegionLocations()) {
+            if (loc != null) {
+              locations.add(loc);
+            }
+          }
+        }
+      }
+      return locations;
+    }
+
+    @Override
+    public List<HRegionLocation> locateRegions(final byte[] tableName,
+       final boolean useCache, final boolean offlined) throws IOException {
+      return locateRegions(TableName.valueOf(tableName), useCache, offlined);
+    }
+
+    @Override
+    public HRegionLocation locateRegion(final TableName tableName,
+        final byte [] row)
+    throws IOException{
+      RegionLocations locations = locateRegion(tableName, row, true, true);
+      return locations == null ? null : locations.getRegionLocation();
+    }
+
+    @Override
+    public HRegionLocation locateRegion(final byte[] tableName,
+        final byte [] row)
+    throws IOException{
+      return locateRegion(TableName.valueOf(tableName), row);
+    }
+
+    @Override
+    public HRegionLocation relocateRegion(final TableName tableName,
+        final byte [] row) throws IOException{
+      // Since this is an explicit request not to use any caching, finding
+      // disabled tables should not be desirable.  This will ensure that an exception is thrown when
+      // the first time a disabled table is interacted with.
+      if (isTableDisabled(tableName)) {
+        throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
+      }
+
+      RegionLocations locations = locateRegion(tableName, row, false, true);
+      return locations == null ? null : locations.getRegionLocation();
+    }
+
+    @Override
+    public HRegionLocation relocateRegion(final byte[] tableName,
+        final byte [] row) throws IOException {
+      return relocateRegion(TableName.valueOf(tableName), row);
+    }
+
+
+    private RegionLocations locateRegion(final TableName tableName,
+      final byte [] row, boolean useCache, boolean retry)
+    throws IOException {
+      if (this.closed) throw new IOException(toString() + " closed");
+      if (tableName== null || tableName.getName().length == 0) {
+        throw new IllegalArgumentException(
+            "table name cannot be null or zero length");
+      }
+
+      if (tableName.equals(TableName.META_TABLE_NAME)) {
+        return this.registry.getMetaRegionLocation();
+      } else {
+        // Region not in the cache - have to go to the meta RS
+        return locateRegionInMeta(TableName.META_TABLE_NAME, tableName, row,
+          useCache, userRegionLock, retry);
+      }
+    }
+
+    /*
+     * Search hbase:meta for the HRegionLocation info that contains the table and
+     * row we're seeking. It will prefetch certain number of regions info and
+     * save them to the global region cache.
+     */
+    private void prefetchRegionCache(final TableName tableName,
+        final byte[] row) {
+      // Implement a new visitor for MetaScanner, and use it to walk through
+      // the hbase:meta
+      MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
+        @Override
+        public boolean processRow(Result result) throws IOException {
+          try {
+            RegionLocations locations = MetaReader.getRegionLocations(result);
+            if (locations == null) {
+              return true;
+            }
+            HRegionInfo regionInfo = locations.getRegionLocation().getRegionInfo();
+
+            // possible we got a region of a different table...
+            if (!regionInfo.getTable().equals(tableName)) {
+              return false; // stop scanning
+            }
+            if (regionInfo.isOffline()) {
+              // don't cache offline regions
+              return true;
+            }
+
+            ServerName serverName = locations.getRegionLocation().getServerName();
+            if (serverName == null) {
+              return true; // don't cache it
+            }
+
+            // cache this meta entry
+            cacheLocation(tableName, locations);
+            return true;
+          } catch (RuntimeException e) {
+            throw new IOException(e);
+          }
+        }
+      };
+      try {
+        // pre-fetch certain number of regions info at region cache.
+        MetaScanner.metaScan(conf, this, visitor, tableName, row,
+            this.prefetchRegionLimit, TableName.META_TABLE_NAME);
+      } catch (IOException e) {
+        if (ExceptionUtil.isInterrupt(e)) {
+          Thread.currentThread().interrupt();
+        } else {
+          LOG.warn("Encountered problems when prefetch hbase:meta table: ", e);
+        }
+      }
+    }
+
+     /*
+      * Search the hbase:meta table for the HRegionLocation
+      * info that contains the table and row we're seeking.
+      */
+    private RegionLocations locateRegionInMeta(final TableName parentTable,
+      final TableName tableName, final byte [] row, boolean useCache,
+      Object regionLockObject, boolean retry)
+    throws IOException {
+      RegionLocations location;
+      // If we are supposed to be using the cache, look in the cache to see if
+      // we already have the region.
+      if (useCache) {
+        location = getCachedLocation(tableName, row);
+        if (location != null) {
+          return location;
+        }
+      }
+      int localNumRetries = retry ? numTries : 1;
+      // build the key of the meta region we should be looking for.
+      // the extra 9's on the end are necessary to allow "exact" matches
+      // without knowing the precise region names.
+      byte [] metaKey = HRegionInfo.createRegionName(tableName, row,
+        HConstants.NINES, false);
+      for (int tries = 0; true; tries++) {
+        if (tries >= localNumRetries) {
+          throw new NoServerForRegionException("Unable to find region for "
+            + Bytes.toStringBinary(row) + " after " + numTries + " tries.");
+        }
+
+        HRegionLocation metaLocation = null;
+        try {
+          // locate the meta region
+          RegionLocations metaLocations = locateRegion(parentTable, metaKey, true, false);
+          metaLocation = metaLocations == null ? null : metaLocations.getRegionLocation();
+          // If null still, go around again.
+          if (metaLocation == null) continue;
+          ClientService.BlockingInterface service = getClient(metaLocation.getServerName());
+
+          Result regionInfoRow;
+          // This block guards against two threads trying to load the meta
+          // region at the same time. The first will load the meta region and
+          // the second will use the value that the first one found.
+          if (useCache) {
+            if (TableName.META_TABLE_NAME.equals(parentTable) &&
+                getRegionCachePrefetch(tableName)) {
+              synchronized (regionLockObject) {
+                // Check the cache again for a hit in case some other thread made the
+                // same query while we were waiting on the lock.
+                location = getCachedLocation(tableName, row);
+                if (location != null) {
+                  return location;
+                }
+                // If the parent table is META, we may want to pre-fetch some
+                // region info into the global region cache for this table.
+                prefetchRegionCache(tableName, row);
+              }
+            }
+            location = getCachedLocation(tableName, row);
+            if (location != null) {
+              return location;
+            }
+          } else {
+            // If we are not supposed to be using the cache, delete any existing cached location
+            // so it won't interfere.
+            metaCache.clearCache(tableName, row);
+          }
+
+          // Query the meta region for the location of the meta region
+          regionInfoRow = ProtobufUtil.getRowOrBefore(service,
+              metaLocation.getRegionInfo().getRegionName(), metaKey,
+              HConstants.CATALOG_FAMILY);
+
+          if (regionInfoRow == null) {
+            throw new TableNotFoundException(tableName);
+          }
+
+          // convert the row result into the HRegionLocation we need!
+          location = MetaReader.getRegionLocations(regionInfoRow);
+          HRegionInfo regionInfo = location.getRegionLocation().getRegionInfo();
+          if (regionInfo == null) {
+            throw new IOException("HRegionInfo was null or empty in " +
+              parentTable + ", row=" + regionInfoRow);
+          }
+
+          // possible we got a region of a different table...
+          if (!regionInfo.getTable().equals(tableName)) {
+            throw new TableNotFoundException(
+                  "Table '" + tableName + "' was not found, got: " +
+                  regionInfo.getTable() + ".");
+          }
+          if (regionInfo.isSplit()) {
+            throw new RegionOfflineException("the only available region for" +
+              " the required row is a split parent," +
+              " the daughters should be online soon: " +
+              regionInfo.getRegionNameAsString());
+          }
+          if (regionInfo.isOffline()) {
+            throw new RegionOfflineException("the region is offline, could" +
+              " be caused by a disable table call: " +
+              regionInfo.getRegionNameAsString());
+          }
+
+          ServerName serverName = location.getRegionLocation().getServerName();
+          if (serverName == null) {
+            throw new NoServerForRegionException("No server address listed " +
+              "in " + parentTable + " for region " +
+              regionInfo.getRegionNameAsString() + " containing row " +
+              Bytes.toStringBinary(row));
+          }
+
+          if (isDeadServer(serverName)){
+            throw new RegionServerStoppedException("hbase:meta says the region "+
+                regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
+                ", but it is dead.");
+          }
+
+          cacheLocation(tableName, location);
+          return location;
+        } catch (TableNotFoundException e) {
+          // if we got this error, probably means the table just plain doesn't
+          // exist. rethrow the error immediately. this should always be coming
+          // from the HTable constructor.
+          throw e;
+        } catch (IOException e) {
+          ExceptionUtil.rethrowIfInterrupt(e);
+
+          if (e instanceof RemoteException) {
+            e = ((RemoteException)e).unwrapRemoteException();
+          }
+          if (tries < numTries - 1) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("locateRegionInMeta parentTable=" +
+                parentTable + ", metaLocation=" +
+                ((metaLocation == null)? "null": "{" + metaLocation + "}") +
+                ", attempt=" + tries + " of " +
+                this.numTries + " failed; retrying after sleep of " +
+                ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
+            }
+          } else {
+            throw e;
+          }
+          // Only relocate the parent region if necessary
+          if(!(e instanceof RegionOfflineException ||
+              e instanceof NoServerForRegionException)) {
+            relocateRegion(parentTable, metaKey);
+          }
+        }
+        try{
+          Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
+        } catch (InterruptedException e) {
+          throw new InterruptedIOException("Giving up trying to location region in " +
+            "meta: thread is interrupted.");
+        }
+      }
+    }
+
+    /**
+     * Put a newly discovered HRegionLocation into the cache.
+     * @param tableName The table name.
+     * @param location the new location
+     */
+    private void cacheLocation(final TableName tableName, final RegionLocations location) {
+      metaCache.cacheLocation(tableName, location);
+    }
+
+    /**
+     * Search the cache for a location that fits our table and row key.
+     * Return null if no suitable region is located.
+     *
+     * @param tableName
+     * @param row
+     * @return Null or region location found in cache.
+     */
+    RegionLocations getCachedLocation(final TableName tableName,
+        final byte [] row) {
+      return metaCache.getCachedLocation(tableName, row);
+    }
+
+
+    /**
+     * Delete a cached location, no matter what it is. Called when we were told to not use cache.
+     * @param tableName tableName
+     * @param row
+     */
+    public void clearRegionCache(final TableName tableName, byte[] row) {
+      metaCache.clearCache(tableName, row);
+    }
+
+    /*
+     * Delete all cached entries of a table that maps to a specific location.
+     */
+    @Override
+    public void clearCaches(final ServerName serverName) {
+      metaCache.clearCache(serverName);
+    }
+
+    @Override
+    public void clearRegionCache() {
+      metaCache.clearCache();
+    }
+
+    @Override
+    public void clearRegionCache(final TableName tableName) {
+      metaCache.clearCache(tableName);
+    }
+
+    @Override
+    public void clearRegionCache(final byte[] tableName) {
+      clearRegionCache(TableName.valueOf(tableName));
+    }
+
+    /**
+     * Put a newly discovered HRegionLocation into the cache.
+     * @param tableName The table name.
+     * @param source the source of the new location, if it's not coming from meta
+     * @param location the new location
+     */
+    private void cacheLocation(final TableName tableName, final ServerName source,
+        final HRegionLocation location) {
+      metaCache.cacheLocation(tableName, source, location);
+    }
+
+    // Map keyed by service name + regionserver to service stub implementation
+    private final ConcurrentHashMap<String, Object> stubs =
+      new ConcurrentHashMap<String, Object>();
+    // Map of locks used creating service stubs per regionserver.
+    private final ConcurrentHashMap<String, String> connectionLock =
+      new ConcurrentHashMap<String, String>();
+
+    /**
+     * State of the MasterService connection/setup.
+     */
+    static class MasterServiceState {
+      HConnection connection;
+      MasterService.BlockingInterface stub;
+      int userCount;
+      long keepAliveUntil = Long.MAX_VALUE;
+
+      MasterServiceState (final HConnection connection) {
+        super();
+        this.connection = connection;
+      }
+
+      @Override
+      public String toString() {
+        return "MasterService";
+      }
+
+      Object getStub() {
+        return this.stub;
+      }
+
+      void clearStub() {
+        this.stub = null;
+      }
+
+      boolean isMasterRunning() throws ServiceException {
+        IsMasterRunningResponse response =
+          this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+        return response != null? response.getIsMasterRunning(): false;
+      }
+    }
+
+    /**
+     * Makes a client-side stub for master services. Sub-class to specialize.
+     * Depends on hosting class so not static.  Exists so we avoid duplicating a bunch of code
+     * when setting up the MasterMonitorService and MasterAdminService.
+     */
+    abstract class StubMaker {
+      /**
+       * Returns the name of the service stub being created.
+       */
+      protected abstract String getServiceName();
+
+      /**
+       * Make stub and cache it internal so can be used later doing the isMasterRunning call.
+       * @param channel
+       */
+      protected abstract Object makeStub(final BlockingRpcChannel channel);
+
+      /**
+       * Once setup, check it works by doing isMasterRunning check.
+       * @throws ServiceException
+       */
+      protected abstract void isMasterRunning() throws ServiceException;
+
+      /**
+       * Create a stub. Try once only.  It is not typed because there is no common type to
+       * protobuf services nor their interfaces.  Let the caller do appropriate casting.
+       * @return A stub for master services.
+       * @throws IOException
+       * @throws KeeperException
+       * @throws ServiceException
+       */
+      private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
+        ZooKeeperKeepAliveConnection zkw;
+        try {
+          zkw = getKeepAliveZooKeeperWatcher();
+        } catch (IOException e) {
+          ExceptionUtil.rethrowIfInterrupt(e);
+          throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
+        }
+        try {
+          checkIfBaseNodeAvailable(zkw);
+          ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
+          if (sn == null) {
+            String msg = "ZooKeeper available but no active master location found";
+            LOG.info(msg);
+            throw new MasterNotRunningException(msg);
+          }
+          if (isDeadServer(sn)) {
+            throw new MasterNotRunningException(sn + " is dead.");
+          }
+          // Use the security info interface name as our stub key
+          String key = getStubKey(getServiceName(), sn.getHostAndPort());
+          connectionLock.putIfAbsent(key, key);
+          Object stub = null;
+          synchronized (connectionLock.get(key)) {
+            stub = stubs.get(key);
+            if (stub == null) {
+              BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn,
+                user, rpcTimeout);
+              stub = makeStub(channel);
+              isMasterRunning();
+              stubs.put(key, stub);
+            }
+          }
+          return stub;
+        } finally {
+          zkw.close();
+        }
+      }
+
+      /**
+       * Create a stub against the master.  Retry if necessary.
+       * @return A stub to do <code>intf</code> against the master
+       * @throws MasterNotRunningException
+       */
+      @edu.umd.cs.findbugs.annotations.SuppressWarnings (value="SWL_SLEEP_WITH_LOCK_HELD")
+      Object makeStub() throws MasterNotRunningException {
+        // The lock must be at the beginning to prevent multiple master creations
+        //  (and leaks) in a multithread context
+        synchronized (masterAndZKLock) {
+          Exception exceptionCaught = null;
+          Object stub = null;
+          int tries = 0;
+          while (!closed && stub == null) {
+            tries++;
+            try {
+              stub = makeStubNoRetries();
+            } catch (IOException e) {
+              exceptionCaught = e;
+            } catch (KeeperException e) {
+              exceptionCaught = e;
+            } catch (ServiceException e) {
+              exceptionCaught = e;
+            }
+
+            if (exceptionCaught != null)
+              // It failed. If it's not the last try, we're going to wait a little
+              if (tries < numTries && !ExceptionUtil.isInterrupt(exceptionCaught)) {
+                // tries at this point is 1 or more; decrement to start from 0.
+                long pauseTime = ConnectionUtils.getPauseTime(pause, tries - 1);
+                LOG.info("getMaster attempt " + tries + " of " + numTries +
+                    " failed; retrying after sleep of " + pauseTime + ", exception=" +
+                  exceptionCaught);
+
+                try {
+                  Thread.sleep(pauseTime);
+                } catch (InterruptedException e) {
+                  throw new MasterNotRunningException(
+                      "Thread was interrupted while trying to connect to master.", e);
+                }
+              } else {
+                // Enough tries, we stop now
+                LOG.info("getMaster attempt " + tries + " of " + numTries +
+                    " failed; no more retrying.", exceptionCaught);
+                throw new MasterNotRunningException(exceptionCaught);
+              }
+          }
+
+          if (stub == null) {
+            // implies this.closed true
+            throw new MasterNotRunningException("Connection was closed while trying to get master");
+          }
+          return stub;
+        }
+      }
+    }
+
+    /**
+     * Class to make a MasterServiceStubMaker stub.
+     */
+    class MasterServiceStubMaker extends StubMaker {
+      private MasterService.BlockingInterface stub;
+      @Override
+      protected String getServiceName() {
+        return MasterService.getDescriptor().getName();
+      }
+
+      @Override
+      @edu.umd.cs.findbugs.annotations.SuppressWarnings("SWL_SLEEP_WITH_LOCK_HELD")
+      MasterService.BlockingInterface makeStub() throws MasterNotRunningException {
+        return (MasterService.BlockingInterface)super.makeStub();
+      }
+
+      @Override
+      protected Object makeStub(BlockingRpcChannel channel) {
+        this.stub = MasterService.newBlockingStub(channel);
+        return this.stub;
+      }
+
+      @Override
+      protected void isMasterRunning() throws ServiceException {
+        this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+      }
+    }
+
+    @Override
+    public AdminService.BlockingInterface getAdmin(final ServerName serverName)
+        throws IOException {
+      return getAdmin(serverName, false);
+    }
+
+    @Override
+    // Nothing is done w/ the 'master' parameter.  It is ignored.
+    public AdminService.BlockingInterface getAdmin(final ServerName serverName,
+      final boolean master)
+    throws IOException {
+      if (isDeadServer(serverName)) {
+        throw new RegionServerStoppedException(serverName + " is dead.");
+      }
+      String key = getStubKey(AdminService.BlockingInterface.class.getName(),
+        serverName.getHostAndPort());
+      this.connectionLock.putIfAbsent(key, key);
+      AdminService.BlockingInterface stub = null;
+      synchronized (this.connectionLock.get(key)) {
+        stub = (AdminService.BlockingInterface)this.stubs.get(key);
+        if (stub == null) {
+          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName,
+            user, this.rpcTimeout);
+          stub = AdminService.newBlockingStub(channel);
+          this.stubs.put(key, stub);
+        }
+      }
+      return stub;
+    }
+
+    @Override
+    public ClientService.BlockingInterface getClient(final ServerName sn)
+    throws IOException {
+      if (isDeadServer(sn)) {
+        throw new RegionServerStoppedException(sn + " is dead.");
+      }
+      String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostAndPort());
+      this.connectionLock.putIfAbsent(key, key);
+      ClientService.BlockingInterface stub = null;
+      synchronized (this.connectionLock.get(key)) {
+        stub = (ClientService.BlockingInterface)this.stubs.get(key);
+        if (stub == null) {
+          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
+            user, this.rpcTimeout);
+          stub = ClientService.newBlockingStub(channel);
+          // In old days, after getting stub/proxy, we'd make a call.  We are not doing that here.
+          // Just fail on first actual call rather than in here on setup.
+          this.stubs.put(key, stub);
+        }
+      }
+      return stub;
+    }
+
+    static String getStubKey(final String serviceName, final String rsHostnamePort) {
+      return serviceName + "@" + rsHostnamePort;
+    }
+
+    private ZooKeeperKeepAliveConnection keepAliveZookeeper;
+    private int keepAliveZookeeperUserCount;
+    private boolean canCloseZKW = true;
+
+    // keepAlive time, in ms. No reason to make it configurable.
+    private static final long keepAlive = 5 * 60 * 1000;
+
+    /**
+     * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have finished with it.
+     * @return The shared instance. Never returns null.
+     */
+    ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
+      throws IOException {
+      synchronized (masterAndZKLock) {
+        if (keepAliveZookeeper == null) {
+          if (this.closed) {
+            throw new IOException(toString() + " closed");
+          }
+          // We don't check that our link to ZooKeeper is still valid
+          // But there is a retry mechanism in the ZooKeeperWatcher itself
+          keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this);
+        }
+        keepAliveZookeeperUserCount++;
+        keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
+        return keepAliveZookeeper;
+      }
+    }
+
+    void releaseZooKeeperWatcher(final ZooKeeperWatcher zkw) {
+      if (zkw == null){
+        return;
+      }
+      synchronized (masterAndZKLock) {
+        --keepAliveZookeeperUserCount;
+        if (keepAliveZookeeperUserCount <= 0 ){
+          keepZooKeeperWatcherAliveUntil = System.currentTimeMillis() + keepAlive;
+        }
+      }
+    }
+
+    /**
+     * Creates a Chore thread to check the connections to master & zookeeper
+     *  and close them when they reach their closing time (
+     *  {@link MasterServiceState#keepAliveUntil} and
+     *  {@link #keepZooKeeperWatcherAliveUntil}). Keep alive time is
+     *  managed by the release functions and the variable {@link #keepAlive}
+     */
+    private static class DelayedClosing extends Chore implements Stoppable {
+      private HConnectionImplementation hci;
+      Stoppable stoppable;
+
+      private DelayedClosing(
+        HConnectionImplementation hci, Stoppable stoppable){
+        super(
+          "ZooKeeperWatcher and Master delayed closing for connection "+hci,
+          60*1000, // We check every minutes
+          stoppable);
+        this.hci = hci;
+        this.stoppable = stoppable;
+      }
+
+      static DelayedClosing createAndStart(HConnectionImplementation hci){
+        Stoppable stoppable = new Stoppable() {
+              private volatile boolean isStopped = false;
+              @Override public void stop(String why) { isStopped = true;}
+              @Override public boolean isStopped() {return isStopped;}
+            };
+
+        return new DelayedClosing(hci, stoppable);
+      }
+
+      protected void closeMasterProtocol(MasterServiceState protocolState) {
+        if (System.currentTimeMillis() > protocolState.keepAliveUntil) {
+          hci.closeMasterService(protocolState);
+          protocolState.keepAliveUntil = Long.MAX_VALUE;
+        }
+      }
+
+      @Override
+      protected void chore() {
+        synchronized (hci.masterAndZKLock) {
+          if (hci.canCloseZKW) {
+            if (System.currentTimeMillis() >
+              hci.keepZooKeeperWatcherAliveUntil) {
+
+              hci.closeZooKeeperWatcher();
+              hci.keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
+            }
+          }
+          closeMasterProtocol(hci.masterServiceState);
+          closeMasterProtocol(hci.masterServiceState);
+        }
+      }
+
+      @Override
+      public void stop(String why) {
+        stoppable.stop(why);
+      }
+
+      @Override
+      public boolean isStopped() {
+        return stoppable.isStopped();
+      }
+    }
+
+    private void closeZooKeeperWatcher() {
+      synchronized (masterAndZKLock) {
+        if (keepAliveZookeeper != null) {
+          LOG.info("Closing zookeeper sessionid=0x" +
+            Long.toHexString(
+              keepAliveZookeeper.getRecoverableZooKeeper().getSessionId()));
+          keepAliveZookeeper.internalClose();
+          keepAliveZookeeper = null;
+        }
+        keepAliveZookeeperUserCount = 0;
+      }
+    }
+
+    final MasterServiceState masterServiceState = new MasterServiceState(this);
+
+    @Override
+    public MasterService.BlockingInterface getMaster() throws MasterNotRunningException {
+      return getKeepAliveMasterService();
+    }
+
+    private void resetMasterServiceState(final MasterServiceState mss) {
+      mss.userCount++;
+      mss.keepAliveUntil = Long.MAX_VALUE;
+    }
+
+    @Override
+    public MasterKeepAliveConnection getKeepAliveMasterService()
+    throws MasterNotRunningException {
+      synchronized (masterAndZKLock) {
+        if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
+          MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
+          this.masterServiceState.stub = stubMaker.makeStub();
+        }
+        resetMasterServiceState(this.masterServiceState);
+      }
+      // Ugly delegation just so we can add in a Close method.
+      final MasterService.BlockingInterface stub = this.masterServiceState.stub;
+      return new MasterKeepAliveConnection() {
+        MasterServiceState mss = masterServiceState;
+        @Override
+        public AddColumnResponse addColumn(RpcController controller, AddColumnRequest request)
+        throws ServiceException {
+          return stub.addColumn(controller, request);
+        }
+
+        @Override
+        public DeleteColumnResponse deleteColumn(RpcController controller,
+            DeleteColumnRequest request)
+        throws ServiceException {
+          return stub.deleteColumn(controller, request);
+        }
+
+        @Override
+        public ModifyColumnResponse modifyColumn(RpcController controller,
+            ModifyColumnRequest request)
+        throws ServiceException {
+          return stub.modifyColumn(controller, request);
+        }
+
+        @Override
+        public MoveRegionResponse moveRegion(RpcController controller,
+            MoveRegionRequest request) throws ServiceException {
+          return stub.moveRegion(controller, request);
+        }
+
+        @Override
+        public DispatchMergingRegionsResponse dispatchMergingRegions(
+            RpcController controller, DispatchMergingRegionsRequest request)
+            throws ServiceException {
+          return stub.dispatchMergingRegions(controller, request);
+        }
+
+        @Override
+        public AssignRegionResponse assignRegion(RpcController controller,
+            AssignRegionRequest request) throws ServiceException {
+          return stub.assignRegion(controller, request);
+        }
+
+        @Override
+        public UnassignRegionResponse unassignRegion(RpcController controller,
+            UnassignRegionRequest request) throws ServiceException {
+          return stub.unassignRegion(controller, request);
+        }
+
+        @Override
+        public OfflineRegionResponse offlineRegion(RpcController controller,
+            OfflineRegionRequest request) throws ServiceException {
+          return stub.offlineRegion(controller, request);
+        }
+
+        @Override
+        public DeleteTableResponse deleteTable(RpcController controller,
+            DeleteTableRequest request) throws ServiceException {
+          return stub.deleteTable(controller, request);
+        }
+
+        @Override
+        public EnableTableResponse enableTable(RpcController controller,
+            EnableTableRequest request) throws ServiceException {
+          return stub.enableTable(controller, request);
+        }
+
+        @Override
+        public DisableTableResponse disableTable(RpcController controller,
+            DisableTableRequest request) throws ServiceException {
+          return stub.disableTable(controller, request);
+        }
+
+        @Override
+        public ModifyTableResponse modifyTable(RpcController controller,
+            ModifyTableRequest request) throws ServiceException {
+          return stub.modifyTable(controller, request);
+        }
+
+        @Override
+        public CreateTableResponse createTable(RpcController controller,
+            CreateTableRequest request) throws ServiceException {
+          return stub.createTable(controller, request);
+        }
+
+        @Override
+        public ShutdownResponse shutdown(RpcController controller,
+            ShutdownRequest request) throws ServiceException {
+          return stub.shutdown(controller, request);
+        }
+
+        @Override
+        public StopMasterResponse stopMaster(RpcController controller,
+            StopMasterRequest request) throws ServiceException {
+          return stub.stopMaster(controller, request);
+        }
+
+        @Override
+        public BalanceResponse balance(RpcController controller,
+            BalanceRequest request) throws ServiceException {
+          return stub.balance(controller, request);
+        }
+
+        @Override
+        public SetBalancerRunningResponse setBalancerRunning(
+            RpcController controller, SetBalancerRunningRequest request)
+            throws ServiceException {
+          return stub.setBalancerRunning(controller, request);
+        }
+
+        @Override
+        public RunCatalogScanResponse runCatalogScan(RpcController controller,
+            RunCatalogScanRequest request) throws ServiceException {
+          return stub.runCatalogScan(controller, request);
+        }
+
+        @Override
+        public EnableCatalogJanitorResponse enableCatalogJanitor(
+            RpcController controller, EnableCatalogJanitorRequest request)
+            throws ServiceException {
+          return stub.enableCatalogJanitor(controller, request);
+        }
+
+        @Override
+        public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
+            RpcController controller, IsCatalogJanitorEnabledRequest request)
+            throws ServiceException {
+          return stub.isCatalogJanitorEnabled(controller, request);
+        }
+
+        @Override
+        public CoprocessorServiceResponse execMasterService(
+            RpcController controller, CoprocessorServiceRequest request)
+            throws ServiceException {
+          return stub.execMasterService(controller, request);
+        }
+
+        @Override
+        public SnapshotResponse snapshot(RpcController controller,
+            SnapshotRequest request) throws ServiceException {
+          return stub.snapshot(controller, request);
+        }
+
+        @Override
+        public GetCompletedSnapshotsResponse getCompletedSnapshots(
+            RpcController controller, GetCompletedSnapshotsRequest request)
+            throws ServiceException {
+          return stub.getCompletedSnapshots(controller, request);
+        }
+
+        @Override
+        public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
+            DeleteSnapshotRequest request) throws ServiceException {
+          return stub.deleteSnapshot(controller, request);
+        }
+
+        @Override
+        public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
+            IsSnapshotDoneRequest request) throws ServiceException {
+          return stub.isSnapshotDone(controller, request);
+        }
+
+        @Override
+        public RestoreSnapshotResponse restoreSnapshot(
+            RpcController controller, RestoreSnapshotRequest request)
+            throws ServiceException {
+          return stub.restoreSnapshot(controller, request);
+        }
+
+        @Override
+        public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(
+            RpcController controller, IsRestoreSnapshotDoneRequest request)
+            throws ServiceException {
+          return stub.isRestoreSnapshotDone(controller, request);
+        }
+
+        @Override
+        public ExecProcedureResponse execProcedure(
+            RpcController controller, ExecProcedureRequest request)
+            throws ServiceException {
+          return stub.execProcedure(controller, request);
+        }
+
+        @Override
+        public IsProcedureDoneResponse isProcedureDone(RpcController controller,
+            IsProcedureDoneRequest request) throws ServiceException {
+          return stub.isProcedureDone(controller, request);
+        }
+
+        @Override
+        public IsMasterRunningResponse isMasterRunning(
+            RpcController controller, IsMasterRunningRequest request)
+            throws ServiceException {
+          return stub.isMasterRunning(controller, request);
+        }
+
+        @Override
+        public ModifyNamespaceResponse modifyNamespace(RpcController controller,
+            ModifyNamespaceRequest request)
+        throws ServiceException {
+          return stub.modifyNamespace(controller, request);
+        }
+
+        @Override
+        public CreateNamespaceResponse createNamespace(
+            RpcController controller, CreateNamespaceRequest request) throws ServiceException {
+          return stub.createNamespace(controller, request);
+        }
+
+        @Override
+        public DeleteNamespaceResponse deleteNamespace(
+            RpcController controller, DeleteNamespaceRequest request) throws ServiceException {
+          return stub.deleteNamespace(controller, request);
+        }
+
+        @Override
+        public GetNamespaceDescriptorResponse getNamespaceDescriptor(RpcController controller,
+            GetNamespaceDescriptorRequest request) throws ServiceException {
+          return stub.getNamespaceDescriptor(controller, request);
+        }
+
+        @Override
+        public ListNamespaceDescriptorsResponse listNamespaceDescriptors(RpcController controller,
+            ListNamespaceDescriptorsRequest request) throws ServiceException {
+          return stub.listNamespaceDescriptors(controller, request);
+        }
+
+        @Override
+        public ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
+            RpcController controller, ListTableDescriptorsByNamespaceRequest request)
+                throws ServiceException {
+          return stub.listTableDescriptorsByNamespace(controller, request);
+        }
+
+        @Override
+        public ListTableNamesByNamespaceResponse listTableNamesByNamespace(
+            RpcController controller, ListTableNamesByNamespaceRequest request)
+                throws ServiceException {
+          return stub.listTableNamesByNamespace(controller, request);
+        }
+
+        @Override
+        public void close() {
+          release(this.mss);
+        }
+
+        @Override
+        public GetSchemaAlterStatusResponse getSchemaAlterStatus(
+            RpcController controller, GetSchemaAlterStatusRequest request)
+            throws ServiceException {
+          return stub.getSchemaAlterStatus(controller, request);
+        }
+
+        @Override
+        public GetTableDescriptorsResponse getTableDescriptors(
+            RpcController controller, GetTableDescriptorsRequest request)
+            throws ServiceException {
+          return stub.getTableDescriptors(controller, request);
+        }
+
+        @Override
+        public GetTableNamesResponse getTableNames(
+            RpcController controller, GetTableNamesRequest request)
+            throws ServiceException {
+          return stub.getTableNames(controller, request);
+        }
+
+        @Override
+        public GetClusterStatusResponse getClusterStatus(
+            RpcController controller, GetClusterStatusRequest request)
+            throws ServiceException {
+          return stub.getClusterStatus(controller, request);
+        }
+      };
+    }
+
+
+    private static void release(MasterServiceState mss) {
+      if (mss != null && mss.connection != null) {
+        ((HConnectionImplementation)mss.connection).releaseMaster(mss);
+      }
+    }
+
+    private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
+      if (mss.getStub() == null){
+        return false;
+      }
+      try {
+        return mss.isMasterRunning();
+      } catch (UndeclaredThrowableException e) {
+        // It's somehow messy, but we can receive exceptions such as
+        //  java.net.ConnectException but they're not declared. So we catch it...
+        LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
+        return false;
+      } catch (ServiceException se) {
+        LOG.warn("Checking master connection", se);
+        return false;
+      }
+    }
+
+    void releaseMaster(MasterServiceState mss) {
+      if (mss.getStub() == null) return;
+      synchronized (masterAndZKLock) {
+        --mss.userCount;
+        if (mss.userCount <= 0) {
+          mss.keepAliveUntil = System.currentTimeMillis() + keepAlive;
+        }
+      }
+    }
+
+    private void closeMasterService(MasterServiceState mss) {
+      if (mss.getStub() != null) {
+        LOG.info("Closing master protocol: " + mss);
+        mss.clearStub();
+      }
+      mss.userCount = 0;
+    }
+
+    /**
+     * Immediate close of the shared master. Can be by the delayed close or when closing the
+     * connection itself.
+     */
+    private void closeMaster() {
+      synchronized (masterAndZKLock) {
+        closeMasterService(masterServiceState);
+      }
+    }
+
+    void updateCachedLocation(HRegionInfo hri, ServerName source,
+                              ServerName serverName, long seqNum) {
+      HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
+      cacheLocation(hri.getTable(), source, newHrl);
+    }
+
+    @Override
+    public void deleteCachedRegionLocation(final HRegionLocation location) {
+      metaCache.clearCache(location);
+    }
+
+    @Override
+    public void updateCachedLocations(final TableName tableName, byte[] rowkey,
+      final Object exception, final ServerName source) {
+      updateCachedLocations(tableName, null, rowkey, exception, source);
+    }
+
+    @Override
+    public void updateCachedLocations(final TableName tableName, byte[] rowkey,
+      final Object exception, final HRegionLocation source) {
+      assert source != null;
+      updateCachedLocations(tableName, source.getRegionInfo().getRegionName()
+          , rowkey, exception, source.getServerName());
+    }
+
+    /**
+     * Update the location with the new value (if the exception is a RegionMovedException)
+     * or delete it from the cache. Does nothing if we can be sure from the exception that
+     * the location is still accurate, or if the cache has already been updated.
+     * @param exception an object (to simplify user code) on which we will try to find a nested
+     *                  or wrapped or both RegionMovedException
+     * @param source server that is the source of the location update.
+     */
+    @Override
+    public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
+      final Object exception, final ServerName source) {
+      if (rowkey == null || tableName == null) {
+        LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
+            ", tableName=" + (tableName == null ? "null" : tableName));
+        return;
+      }
+
+      if (source == null) {
+        // This should not happen, but let's secure ourselves.
+        return;
+      }
+
+      if (regionName == null) {
+        // we do not know which region, so just remove the cache entry for the row and server
+        metaCache.clearCache(tableName, rowkey, source);
+        return;
+      }
+
+      // Is it something we have already updated?
+      final RegionLocations oldLocations = getCachedLocation(tableName, rowkey);
+      HRegionLocation oldLocation = null;
+      if (oldLocations != null) {
+        oldLocation = oldLocations.getRegionLocationByRegionName(regionName);
+      }
+      if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
+        // There is no such location in the cache (it's been removed already) or
+        // the cache has already been refreshed with a different location.  => nothing to do
+        return;
+      }
+
+      HRegionInfo regionInfo = oldLocation.getRegionInfo();
+      Throwable cause = findException(exception);
+      if (cause != null) {
+        if (cause instanceof RegionTooBusyException || cause instanceof RegionOpeningException) {
+          // We know that the region is still on this region server
+          return;
+        }
+
+        if (cause instanceof RegionMovedException) {
+          RegionMovedException rme = (RegionMovedException) cause;
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
+                rme.getHostname() + ":" + rme.getPort() +
+                " according to " + source.getHostAndPort());
+          }
+          // We know that the region is not anymore on this region server, but we know
+          //  the new location.
+          updateCachedLocation(
+              regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
+          return;
+        }
+      }
+
+      // If we're here, it means that can cannot be sure about the location, so we remove it from
+      // the cache. Do not send the source because source can be a new server in the same host:port
+      metaCache.clearCache(regionInfo);
+    }
+
+    @Override
+    public void updateCachedLocations(final byte[] tableName, byte[] rowkey,
+      final Object exception, final HRegionLocation source) {
+      updateCachedLocations(TableName.valueOf(tableName), rowkey, exception, source);
+    }
+
+    @Override
+    @Deprecated
+    public void processBatch(List<? extends Row> list,
+        final TableName tableName,
+        ExecutorService pool,
+        Object[] results) throws IOException, InterruptedException {
+      // This belongs in HTable!!! Not in here.  St.Ack
+
+      // results must be the same size as list
+      if (results.length != list.size()) {
+        throw new IllegalArgumentException(
+          "argument results must be the same size as argument list");
+      }
+      processBatchCallback(list, tableName, pool, results, null);
+    }
+
+    @Override
+    @Deprecated
+    public void processBatch(List<? extends Row> list,
+        final byte[] tableName,
+        ExecutorService pool,
+        Object[] results) throws IOException, InterruptedException {
+      processBatch(list, TableName.valueOf(tableName), pool, results);
+    }
+
+    /**
+     * Send the queries in parallel on the different region servers. Retries on failures.
+     * If the method returns it means that there is no error, and the 'results' array will
+     * contain no exception. On error, an exception is thrown, and the 'results' array will
+     * contain results and exceptions.
+     * @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead
+     */
+    @Override
+    @Deprecated
+    public <R> void processBatchCallback(
+      List<? extends Row> list,
+      TableName tableName,
+      ExecutorService pool,
+      Object[] results,
+      Batch.Callback<R> callback)
+      throws IOException, InterruptedException {
+
+      AsyncRequestFuture ars = this.asyncProcess.submitAll(
+          pool, tableName, list, callback, results);
+      ars.waitUntilDone();
+      if (ars.hasError()) {
+        throw ars.getErrors();
+      }
+    }
+
+    @Override
+    @Deprecated
+    public <R> void processBatchCallback(
+      List<? extends Row> list,
+      byte[] tableName,
+      ExecutorService pool,
+      Object[] results,
+      Batch.Callback<R> callback)
+      throws IOException, InterruptedException {
+      processBatchCallback(list, TableName.valueOf(tableName), pool, results, callback);
+    }
+
+    // For tests to override.
+    protected AsyncProcess createAsyncProcess(Configuration conf) {
+      // No default pool available.
+      return new AsyncProcess(
+          this, conf, this.batchPool, RpcRetryingCallerFactory.instantiate(conf), false);
+    }
+
+    @Override
+    public AsyncProcess getAsyncProcess() {
+      return asyncProcess;
+    }
+
+    /*
+     * Return the number of cached region for a table. It will only be called
+     * from a unit test.
+     */
+    @VisibleForTesting
+    int getNumberOfCachedRegionLocations(final TableName tableName) {
+      return metaCache.getNumberOfCachedRegionLocations(tableName);
+    }
+
+    @Override
+    public void setRegionCachePrefetch(final TableName tableName,
+        final boolean enable) {
+      metaCache.setRegionCachePrefetch(tableName, enable);
+    }
+
+    @Override
+    public void setRegionCachePrefetch(final byte[] tableName,
+        final boolean enable) {
+      setRegionCachePrefetch(TableName.valueOf(tableName), enable);
+    }
+
+    @Override
+    public boolean getRegionCachePrefetch(TableName tableName) {
+      return metaCache.getRegionCachePrefetch(tableName);
+    }
+
+    @Override
+    public boolean getRegionCachePrefetch(byte[] tableName) {
+      return getRegionCachePrefetch(TableName.valueOf(tableName));
+    }
+
+    @Override
+    public void abort(final String msg, Throwable t) {
+      if (t instanceof KeeperException.SessionExpiredException
+        && keepAliveZookeeper != null) {
+        synchronized (masterAndZKLock) {
+          if (keepAliveZookeeper != null) {
+            LOG.warn("This client just lost it's session with ZooKeeper," +
+              " closing it." +
+              " It will be recreated next time someone needs it", t);
+            closeZooKeeperWatcher();
+          }
+        }
+      } else {
+        if (t != null) {
+          LOG.fatal(msg, t);
+        } else {
+          LOG.fatal(msg);
+        }
+        this.aborted = true;
+        close();
+        this.closed = true;
+      }
+    }
+
+    @Override
+    public boolean isClosed() {
+      return this.closed;
+    }
+
+    @Override
+    public boolean isAborted(){
+      return this.aborted;
+    }
+
+    @Override
+    public int getCurrentNrHRS() throws IOException {
+      return this.registry.getCurrentNrHRS();
+    }
+
+    /**
+     * Increment this client's reference count.
+     */
+    void incCount() {
+      ++refCount;
+    }
+
+    /**
+     * Decrement this client's reference count.
+     */
+    void decCount() {
+      if (refCount > 0) {
+        --refCount;
+      }
+    }
+
+    /**
+     * Return if this client has no reference
+     *
+     * @return true if this client has no reference; false otherwise
+     */
+    boolean isZeroReference() {
+      return refCount == 0;
+    }
+
+    void internalClose() {
+      if (this.closed) {
+        return;
+      }
+      delayedClosing.stop("Closing connection");
+      closeMaster();
+      shutdownBatchPool();
+      this.closed = true;
+      closeZooKeeperWatcher();
+      this.stubs.clear();
+      if (clusterStatusListener != null) {
+        clusterStatusListener.close();
+      }
+    }
+
+    @Override
+    public void close() {
+      if (managed) {
+        if (aborted) {
+          ConnectionManager.deleteStaleConnection(this);
+        } else {
+          ConnectionManager.deleteConnection(this, false);
+        }
+      } else {
+        internalClose();
+      }
+    }
+
+    /**
+     * Close the connection for good, regardless of what the current value of
+     * {@link #refCount} is. Ideally, {@link #refCount} should be zero at this
+     * point, which would be the case if all of its consumers close the
+     * connection. However, on the off chance that someone is unable to close
+     * the connection, perhaps because it bailed out prematurely, the method
+     * below will ensure that this {@link HConnection} instance is cleaned up.

[... 238 lines stripped ...]


Mime
View raw message