hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jxi...@apache.org
Subject svn commit: r1581479 [5/9] - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-it/src/test/java/org/apache/hadoop/hbase/ hbase-it/src/test/java/org/apache/hadoo...
Date Tue, 25 Mar 2014 19:34:55 GMT
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1581479&r1=1581478&r2=1581479&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Mar 25 19:34:52 2014
@@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.regionse
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.lang.Thread.UncaughtExceptionHandler;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryUsage;
 import java.lang.reflect.Constructor;
@@ -38,145 +36,61 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Random;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.management.ObjectName;
+import javax.servlet.http.HttpServlet;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.HBaseZeroCopyByteString;
+import org.apache.commons.lang.math.RandomUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScannable;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.Chore;
 import org.apache.hadoop.hbase.ClockOutOfSyncException;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.HealthCheckChore;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.YouAreDeadException;
 import org.apache.hadoop.hbase.ZNodeClearer;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.catalog.MetaEditor;
-import org.apache.hadoop.hbase.catalog.MetaReader;
-import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.DroppedSnapshotException;
-import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
-import org.apache.hadoop.hbase.exceptions.OperationConflictException;
-import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.executor.ExecutorType;
-import org.apache.hadoop.hbase.filter.ByteArrayComparable;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.PriorityFunction;
-import org.apache.hadoop.hbase.ipc.RpcCallContext;
 import org.apache.hadoop.hbase.ipc.RpcClient;
-import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
-import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
@@ -184,40 +98,32 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
-import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
-import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
-import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
-import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CompressionTest;
-import org.apache.hadoop.hbase.util.Counter;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.InfoServer;
 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Sleeper;
-import org.apache.hadoop.hbase.util.Strings;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
@@ -230,18 +136,15 @@ import org.apache.hadoop.hbase.zookeeper
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.metrics.util.MBeanUtil;
-import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
+import com.google.protobuf.HBaseZeroCopyByteString;
 import com.google.protobuf.ServiceException;
-import com.google.protobuf.TextFormat;
 
 /**
  * HRegionServer makes a set of HRegions available to clients. It checks in with
@@ -249,16 +152,11 @@ import com.google.protobuf.TextFormat;
  */
 @InterfaceAudience.Private
 @SuppressWarnings("deprecation")
-public class HRegionServer implements ClientProtos.ClientService.BlockingInterface,
-  AdminProtos.AdminService.BlockingInterface, Runnable, RegionServerServices,
-  HBaseRPCErrorHandler, LastSequenceId {
+public class HRegionServer extends HasThread implements
+    RegionServerServices, LastSequenceId {
 
   public static final Log LOG = LogFactory.getLog(HRegionServer.class);
 
-  private final Random rand;
-
-  private final AtomicLong scannerIdGen = new AtomicLong(0L);
-
   /*
    * Strings to be used in forming the exception message for
    * RegionsAlreadyInTransitionException.
@@ -272,12 +170,6 @@ public class HRegionServer implements Cl
   protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
     new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
 
-  /** RPC scheduler to use for the region server. */
-  public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
-      "hbase.region.server.rpc.scheduler.factory.class";
-
-  protected long maxScannerResultSize;
-
   // Cache flushing
   protected MemStoreFlusher cacheFlusher;
 
@@ -302,9 +194,6 @@ public class HRegionServer implements Cl
   // Compactions
   public CompactSplitThread compactSplitThread;
 
-  final ConcurrentHashMap<String, RegionScannerHolder> scanners =
-      new ConcurrentHashMap<String, RegionScannerHolder>();
-
   /**
    * Map of regions currently being served by this region server. Key is the
    * encoded region name.  All access should be synchronized.
@@ -337,9 +226,6 @@ public class HRegionServer implements Cl
   // Instance of the hbase executor service.
   protected ExecutorService service;
 
-  // Request counter. (Includes requests that are not serviced by regions.)
-  final Counter requestCount = new Counter();
-
   // If false, the file system has become unavailable
   protected volatile boolean fsOk;
   protected HFileSystem fs;
@@ -347,14 +233,11 @@ public class HRegionServer implements Cl
   // Set when a report to the master comes back with a message asking us to
   // shutdown. Also set by call to stop when debugging or running unit tests
   // of HRegionServer in isolation.
-  protected volatile boolean stopped = false;
+  private volatile boolean stopped = false;
 
   // Go down hard. Used if file system becomes unavailable and also in
   // debugging and unit tests.
-  protected volatile boolean abortRequested;
-
-  // region server static info like info port
-  private RegionServerInfo.Builder rsInfo;
+  private volatile boolean abortRequested;
 
   ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
 
@@ -372,7 +255,7 @@ public class HRegionServer implements Cl
 
   final int numRetries;
   protected final int threadWakeFrequency;
-  private final int msgInterval;
+  protected final int msgInterval;
 
   protected final int numRegionsToReport;
 
@@ -381,26 +264,18 @@ public class HRegionServer implements Cl
   // RPC client. Used to make the stub above that does region server status checking.
   RpcClient rpcClient;
 
-  // Server to handle client requests. Default access so can be accessed by
-  // unit tests.
-  RpcServerInterface rpcServer;
-
-  private final InetSocketAddress isa;
   private UncaughtExceptionHandler uncaughtExceptionHandler;
 
   // Info server. Default access so can be used by unit tests. REGIONSERVER
   // is name of the webapp and the attribute name used stuffing this instance
   // into web context.
-  InfoServer infoServer;
+  protected InfoServer infoServer;
   private JvmPauseMonitor pauseMonitor;
 
   /** region server process name */
   public static final String REGIONSERVER = "regionserver";
 
-  /** region server configuration name */
-  public static final String REGIONSERVER_CONF = "regionserver_conf";
-
-  private MetricsRegionServer metricsRegionServer;
+  MetricsRegionServer metricsRegionServer;
   private SpanReceiverHost spanReceiverHost;
 
   /*
@@ -423,23 +298,23 @@ public class HRegionServer implements Cl
   LogRoller hlogRoller;
   LogRoller metaHLogRoller;
 
-  // flag set after we're done setting up server threads (used for testing)
-  protected volatile boolean isOnline;
+  // flag set after we're done setting up server threads
+  protected AtomicBoolean online;
 
   // zookeeper connection and watcher
-  private ZooKeeperWatcher zooKeeper;
+  protected ZooKeeperWatcher zooKeeper;
 
   // master address tracker
   private MasterAddressTracker masterAddressTracker;
 
   // Cluster Status Tracker
-  private ClusterStatusTracker clusterStatusTracker;
+  protected ClusterStatusTracker clusterStatusTracker;
 
   // Log Splitting Worker
   private SplitLogWorker splitLogWorker;
 
   // A sleeper that sleeps for msgInterval.
-  private final Sleeper sleeper;
+  protected final Sleeper sleeper;
 
   private final int operationTimeout;
 
@@ -460,12 +335,12 @@ public class HRegionServer implements Cl
    * against  Master.  The hostname can differ from the hostname in {@link #isa}
    * but usually doesn't if both servers resolve .
    */
-  private ServerName serverNameFromMasterPOV;
+  protected ServerName serverName;
 
   /**
    * This servers startcode.
    */
-  private final long startcode;
+  protected final long startcode;
 
   /**
    * Unique identifier for the cluster we are a part of.
@@ -482,25 +357,15 @@ public class HRegionServer implements Cl
    */
   private MovedRegionsCleaner movedRegionsCleaner;
 
-  /**
-   * The lease timeout period for client scanners (milliseconds).
-   */
-  private final int scannerLeaseTimeoutPeriod;
-
-  /**
-   * The reference to the priority extraction function
-   */
-  private final PriorityFunction priority;
-
   private RegionServerCoprocessorHost rsHost;
 
   private RegionServerProcedureManagerHost rspmHost;
 
   // configuration setting on if replay WAL edits directly to another RS
-  private final boolean distributedLogReplay;
+  protected final boolean distributedLogReplay;
 
   // Table level lock manager for locking for region operations
-  private TableLockManager tableLockManager;
+  protected TableLockManager tableLockManager;
 
   /**
    * Nonce manager. Nonces are used to make operations like increment and append idempotent
@@ -520,10 +385,12 @@ public class HRegionServer implements Cl
    * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
    * latest nonce in it expired. It can also be recovered during move.
    */
-  private final ServerNonceManager nonceManager;
+  final ServerNonceManager nonceManager;
 
   private UserProvider userProvider;
 
+  protected final RSRpcServices rpcServices;
+
   /**
    * Starts a HRegionServer at the default location
    *
@@ -532,13 +399,12 @@ public class HRegionServer implements Cl
    * @throws InterruptedException
    */
   public HRegionServer(Configuration conf)
-  throws IOException, InterruptedException {
+      throws IOException, InterruptedException {
     this.fsOk = true;
     this.conf = conf;
-    this.isOnline = false;
     checkCodecs(this.conf);
+    this.online = new AtomicBoolean(false);
     this.userProvider = UserProvider.instantiate(conf);
-
     FSUtils.setupShortCircuitRead(this.conf);
 
     // Config'ed params
@@ -552,10 +418,6 @@ public class HRegionServer implements Cl
     boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
     this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
 
-    this.maxScannerResultSize = conf.getLong(
-      HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
-      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
-
     this.numRegionsToReport = conf.getInt(
       "hbase.regionserver.numregionstoreport", 10);
 
@@ -566,58 +428,19 @@ public class HRegionServer implements Cl
     this.abortRequested = false;
     this.stopped = false;
 
-    this.scannerLeaseTimeoutPeriod = HBaseConfiguration.getInt(conf,
-      HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
-      HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
-      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
-
-    // Server to handle client requests.
-    String hostname = conf.get("hbase.regionserver.ipc.address",
-      Strings.domainNamePointerToHostName(DNS.getDefaultHost(
-        conf.get("hbase.regionserver.dns.interface", "default"),
-        conf.get("hbase.regionserver.dns.nameserver", "default"))));
-    int port = conf.getInt(HConstants.REGIONSERVER_PORT,
-      HConstants.DEFAULT_REGIONSERVER_PORT);
-    // Creation of a HSA will force a resolve.
-    InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
-    if (initialIsa.getAddress() == null) {
-      throw new IllegalArgumentException("Failed resolve of " + initialIsa);
-    }
-    this.rand = new Random(initialIsa.hashCode());
-    String name = "regionserver/" + initialIsa.toString();
-    // Set how many times to retry talking to another server over HConnection.
-    ConnectionUtils.setServerSideHConnectionRetriesConfig(this.conf, name, LOG);
-    this.priority = new AnnotationReadingPriorityFunction(this);
-    RpcSchedulerFactory rpcSchedulerFactory;
-    try {
-      Class<?> rpcSchedulerFactoryClass = conf.getClass(
-          REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
-          SimpleRpcSchedulerFactory.class);
-      rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
-    } catch (InstantiationException e) {
-      throw new IllegalArgumentException(e);
-    } catch (IllegalAccessException e) {
-      throw new IllegalArgumentException(e);
-    }
-    this.rpcServer = new RpcServer(this, name, getServices(),
-      /*HBaseRPCErrorHandler.class, OnlineRegions.class},*/
-      initialIsa, // BindAddress is IP we got for this server.
-      conf,
-      rpcSchedulerFactory.create(conf, this));
-
-    // Set our address.
-    this.isa = this.rpcServer.getListenerAddress();
-
-    this.rpcServer.setErrorHandler(this);
+    rpcServices = createRpcServices();
     this.startcode = System.currentTimeMillis();
+    String hostName = rpcServices.isa.getHostName();
+    serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
+    this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf);
 
     // login the zookeeper client principal (if using security)
     ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
-      "hbase.zookeeper.client.kerberos.principal", this.isa.getHostName());
+      "hbase.zookeeper.client.kerberos.principal", hostName);
 
     // login the server principal (if using secure Hadoop)
     userProvider.login("hbase.regionserver.keytab.file",
-      "hbase.regionserver.kerberos.principal", this.isa.getHostName());
+      "hbase.regionserver.kerberos.principal", hostName);
     regionServerAccounting = new RegionServerAccounting();
     cacheConfig = new CacheConfig(conf);
     uncaughtExceptionHandler = new UncaughtExceptionHandler() {
@@ -627,25 +450,80 @@ public class HRegionServer implements Cl
       }
     };
 
-    this.rsInfo = RegionServerInfo.newBuilder();
-    // Put up the webui. Webui may come up on port other than configured if
-    // that port is occupied. Adjust serverInfo if this is the case.
-    this.rsInfo.setInfoPort(putUpWebUI());
-    this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf);
+    // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
+    // underlying hadoop hdfs accessors will be going against wrong filesystem
+    // (unless all is set to defaults).
+    FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
+    // Get fs instance used by this RS.  Do we use checksum verification in the hbase? If hbase
+    // checksum verification enabled, then automatically switch off hdfs checksum verification.
+    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
+    this.fs = new HFileSystem(this.conf, useHBaseChecksum);
+    this.rootDir = FSUtils.getRootDir(this.conf);
+    this.tableDescriptors = new FSTableDescriptors(
+      this.fs, this.rootDir, !canUpdateTableDescriptor());
+
+    service = new ExecutorService(getServerName().toShortString());
+    spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
+
+    // Some unit tests don't need a cluster, so no zookeeper at all
+    if (!conf.getBoolean("hbase.testing.nocluster", false)) {
+      // Open connection to zookeeper and set primary watcher
+      zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
+        rpcServices.isa.getPort(), this, canCreateBaseZNode());
+  
+      tableLockManager = TableLockManager.createTableLockManager(
+        conf, zooKeeper, serverName);
+  
+      masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
+      masterAddressTracker.start();
+  
+      clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
+      clusterStatusTracker.start();
+  
+      catalogTracker = createCatalogTracker();
+      catalogTracker.start();
+    }
+
+    putUpWebUI();
+  }
+
+  protected String getProcessName() {
+    return REGIONSERVER;
+  }
+
+  protected boolean canCreateBaseZNode() {
+    return false;
+  }
+
+  protected boolean canUpdateTableDescriptor() {
+    return false;
+  }
+
+  protected RSRpcServices createRpcServices() throws IOException {
+    return new RSRpcServices(this);
+  }
+
+  protected void configureInfoServer() {
+    infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class);
+    infoServer.setAttribute(REGIONSERVER, this);
+  }
+
+  protected Class<? extends HttpServlet> getDumpServlet() {
+    return RSDumpServlet.class;
+  }
+
+  protected void doMetrics() {
   }
 
   /**
-   * @return list of blocking services and their security info classes that this server supports
+   * Create CatalogTracker.
+   * In its own method so can intercept and mock it over in tests.
+   * @throws IOException
    */
-  private List<BlockingServiceAndInterface> getServices() {
-    List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(2);
-    bssi.add(new BlockingServiceAndInterface(
-        ClientProtos.ClientService.newReflectiveBlockingService(this),
-        ClientProtos.ClientService.BlockingInterface.class));
-    bssi.add(new BlockingServiceAndInterface(
-        AdminProtos.AdminService.newReflectiveBlockingService(this),
-        AdminProtos.AdminService.BlockingInterface.class));
-    return bssi;
+  protected CatalogTracker createCatalogTracker() throws IOException {
+    HConnection conn = ConnectionUtils.createShortCircuitHConnection(
+      HConnectionManager.getConnection(conf), serverName, rpcServices, rpcServices);
+    return new CatalogTracker(zooKeeper, conf, conn, this);
   }
 
   /**
@@ -665,33 +543,10 @@ public class HRegionServer implements Cl
     }
   }
 
-  String getClusterId() {
+  public String getClusterId() {
     return this.clusterId;
   }
 
-  @Override
-  public int getPriority(RequestHeader header, Message param) {
-    return priority.getPriority(header, param);
-  }
-
-  @Retention(RetentionPolicy.RUNTIME)
-  protected @interface QosPriority {
-    int priority() default 0;
-  }
-
-  PriorityFunction getPriority() {
-    return priority;
-  }
-
-  RegionScanner getScanner(long scannerId) {
-    String scannerIdString = Long.toString(scannerId);
-    RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
-    if (scannerHolder != null) {
-      return scannerHolder.s;
-    }
-    return null;
-  }
-
   /**
    * All initialization needed before we go register with Master.
    *
@@ -705,7 +560,7 @@ public class HRegionServer implements Cl
     } catch (Throwable t) {
       // Call stop if error or process will stick around for ever since server
       // puts up non-daemon threads.
-      this.rpcServer.stop();
+      this.rpcServices.stop();
       abort("Initialization of RS failed.  Hence aborting RS.", t);
     }
   }
@@ -719,10 +574,6 @@ public class HRegionServer implements Cl
    * @throws InterruptedException
    */
   private void initializeZooKeeper() throws IOException, InterruptedException {
-    // Open connection to zookeeper and set primary watcher
-    this.zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + ":" +
-      this.isa.getPort(), this);
-
     // Create the master address tracker, register with zk, and start it.  Then
     // block until a master is available.  No point in starting up if no master
     // running.
@@ -732,14 +583,8 @@ public class HRegionServer implements Cl
 
     // Wait on cluster being up.  Master will set this flag up in zookeeper
     // when ready.
-    this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this);
-    this.clusterStatusTracker.start();
     blockAndCheckIfStopped(this.clusterStatusTracker);
 
-    // Create the catalog tracker and start it;
-    this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this);
-    catalogTracker.start();
-
     // Retrieve clusterId
     // Since cluster status is now up
     // ID should have already been set by HMaster
@@ -761,9 +606,6 @@ public class HRegionServer implements Cl
     } catch (KeeperException e) {
       this.abort("Failed to reach zk cluster when creating procedure handler.", e);
     }
-    this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper,
-        ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode));
-
     // register watcher for recovering regions
     if(this.distributedLogReplay) {
       this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
@@ -823,7 +665,7 @@ public class HRegionServer implements Cl
 
     // Setup RPC client for master communication
     rpcClient = new RpcClient(conf, clusterId, new InetSocketAddress(
-        this.isa.getAddress(), 0));
+      rpcServices.isa.getAddress(), 0));
     this.pauseMonitor = new JvmPauseMonitor(conf);
     pauseMonitor.start();
   }
@@ -841,12 +683,14 @@ public class HRegionServer implements Cl
     }
 
     try {
-      // Set our ephemeral znode up in zookeeper now we have a name.
-      createMyEphemeralNode();
-
-      // Initialize the RegionServerCoprocessorHost now that our ephemeral
-      // node was created, in case any coprocessors want to use ZooKeeper
-      this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
+      if (!isStopped() && !isAborted()) {
+        ShutdownHook.install(conf, fs, this, Thread.currentThread());
+        // Set our ephemeral znode up in zookeeper now we have a name.
+        createMyEphemeralNode();
+        // Initialize the RegionServerCoprocessorHost now that our ephemeral
+        // node was created, in case any coprocessors want to use ZooKeeper
+        this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
+      }
 
       // Try and register with the Master; tell it we are here.  Break if
       // server is stopped or the clusterup flag is down or hdfs went wacky.
@@ -861,17 +705,17 @@ public class HRegionServer implements Cl
         }
       }
 
-      if (!this.stopped && isHealthy()){
+      if (!isStopped() && isHealthy()){
         // start the snapshot handler and other procedure handlers,
         // since the server is ready to run
         rspmHost.start();
       }
 
       // We registered with the Master.  Go into run mode.
-      long lastMsg = 0;
+      long lastMsg = System.currentTimeMillis();
       long oldRequestCount = -1;
       // The main run loop.
-      while (!this.stopped && isHealthy()) {
+      while (!isStopped() && isHealthy()) {
         if (!isClusterUp()) {
           if (isOnlineRegionsEmpty()) {
             stop("Exiting; cluster shutdown set and not carrying any regions");
@@ -903,11 +747,14 @@ public class HRegionServer implements Cl
         if ((now - lastMsg) >= msgInterval) {
           tryRegionServerReport(lastMsg, now);
           lastMsg = System.currentTimeMillis();
+          doMetrics();
+        }
+        if (!isStopped() && !isAborted()) {
+          this.sleeper.sleep();
         }
-        if (!this.stopped) this.sleeper.sleep();
       } // for
     } catch (Throwable t) {
-      if (!checkOOME(t)) {
+      if (!rpcServices.checkOOME(t)) {
         String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
         abort(prefix + t.getMessage(), t);
       }
@@ -918,7 +765,6 @@ public class HRegionServer implements Cl
       mxBean = null;
     }
     if (this.leases != null) this.leases.closeAfterLeasesExpire();
-    this.rpcServer.stop();
     if (this.splitLogWorker != null) {
       splitLogWorker.stop();
     }
@@ -942,8 +788,6 @@ public class HRegionServer implements Cl
     if(this.hMemManager != null) this.hMemManager.stop();
     if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
     if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
-    if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary();
-    if (this.metaHLogRoller != null) this.metaHLogRoller.interruptIfNecessary();
     if (this.compactionChecker != null)
       this.compactionChecker.interrupt();
     if (this.healthCheckChore != null) {
@@ -962,11 +806,10 @@ public class HRegionServer implements Cl
       if (this.fsOk) {
         closeUserRegions(abortRequested); // Don't leave any open file handles
       }
-      LOG.info("aborting server " + this.serverNameFromMasterPOV);
+      LOG.info("aborting server " + this.serverName);
     } else {
       closeUserRegions(abortRequested);
-      closeAllScanners();
-      LOG.info("stopping server " + this.serverNameFromMasterPOV);
+      LOG.info("stopping server " + this.serverName);
     }
     // Interrupt catalog tracker here in case any regions being opened out in
     // handlers are stuck waiting on meta.
@@ -985,7 +828,7 @@ public class HRegionServer implements Cl
 
     if (!this.killed && this.fsOk) {
       waitOnAllRegionsToClose(abortRequested);
-      LOG.info("stopping server " + this.serverNameFromMasterPOV +
+      LOG.info("stopping server " + this.serverName +
         "; all regions closed.");
     }
 
@@ -1005,9 +848,11 @@ public class HRegionServer implements Cl
     }
 
     if (!killed) {
-      join();
+      stopServiceThreads();
     }
 
+    this.rpcServices.stop();
+
     try {
       deleteMyEphemeralNode();
     } catch (KeeperException e) {
@@ -1016,8 +861,9 @@ public class HRegionServer implements Cl
     // We may have failed to delete the znode at the previous step, but
     //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
     ZNodeClearer.deleteMyEphemeralNodeOnDisk();
+
     this.zooKeeper.close();
-    LOG.info("stopping server " + this.serverNameFromMasterPOV +
+    LOG.info("stopping server " + this.serverName +
       "; zookeeper connection closed.");
 
     LOG.info(Thread.currentThread().getName() + " exiting");
@@ -1061,7 +907,7 @@ public class HRegionServer implements Cl
     try {
       RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
       ServerName sn = ServerName.parseVersionedServerName(
-        this.serverNameFromMasterPOV.getVersionedBytes());
+        this.serverName.getVersionedBytes());
       request.setServer(ProtobufUtil.toServerName(sn));
       request.setLoad(sl);
       this.rssStub.regionServerReport(null, request.build());
@@ -1210,18 +1056,6 @@ public class HRegionServer implements Cl
     }
   }
 
-  private void closeAllScanners() {
-    // Close any outstanding scanners. Means they'll get an UnknownScanner
-    // exception next time they come in.
-    for (Map.Entry<String, RegionScannerHolder> e : this.scanners.entrySet()) {
-      try {
-        e.getValue().s.close();
-      } catch (IOException ioe) {
-        LOG.warn("Closing scanner " + e.getKey(), ioe);
-      }
-    }
-  }
-
   /*
    * Run init. Sets up hlog and starts up all server threads.
    *
@@ -1235,11 +1069,11 @@ public class HRegionServer implements Cl
         // The hostname the master sees us as.
         if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
           String hostnameFromMasterPOV = e.getValue();
-          this.serverNameFromMasterPOV = ServerName.valueOf(hostnameFromMasterPOV,
-              this.isa.getPort(), this.startcode);
-          if (!hostnameFromMasterPOV.equals(this.isa.getHostName())) {
+          this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
+            rpcServices.isa.getPort(), this.startcode);
+          if (!hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
             LOG.info("Master passed us a different hostname to use; was=" +
-              this.isa.getHostName() + ", but now=" + hostnameFromMasterPOV);
+              rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV);
           }
           continue;
         }
@@ -1254,39 +1088,27 @@ public class HRegionServer implements Cl
       // config param for task trackers, but we can piggyback off of it.
       if (this.conf.get("mapred.task.id") == null) {
         this.conf.set("mapred.task.id", "hb_rs_" +
-          this.serverNameFromMasterPOV.toString());
+          this.serverName.toString());
       }
 
       // Save it in a file, this will allow to see if we crash
       ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
 
-      // Master sent us hbase.rootdir to use. Should be fully qualified
-      // path with file system specification included. Set 'fs.defaultFS'
-      // to match the filesystem on hbase.rootdir else underlying hadoop hdfs
-      // accessors will be going against wrong filesystem (unless all is set
-      // to defaults).
-      FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
-      // Get fs instance used by this RS.  Do we use checksum verification in the hbase? If hbase
-      // checksum verification enabled, then automatically switch off hdfs checksum verification.
-      boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
-      this.fs = new HFileSystem(this.conf, useHBaseChecksum);
-      this.rootDir = FSUtils.getRootDir(this.conf);
-      this.tableDescriptors = new FSTableDescriptors(this.fs, this.rootDir, true);
       this.hlog = setupWALAndReplication();
       // Init in here rather than in constructor after thread name has been set
       this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
 
-      spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
-
       startServiceThreads();
       startHeapMemoryManager();
-      LOG.info("Serving as " + this.serverNameFromMasterPOV +
-        ", RpcServer on " + this.isa +
+      LOG.info("Serving as " + this.serverName +
+        ", RpcServer on " + rpcServices.isa +
         ", sessionid=0x" +
         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
-      isOnline = true;
+      synchronized (online) {
+        online.set(true);
+        online.notifyAll();
+      }
     } catch (Throwable e) {
-      this.isOnline = false;
       stop("Failed initialization");
       throw convertThrowableToIOE(cleanup(e, "Failed init"),
           "Region server startup failed");
@@ -1303,6 +1125,8 @@ public class HRegionServer implements Cl
   }
 
   private void createMyEphemeralNode() throws KeeperException, IOException {
+    RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
+    rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
     byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
     ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
       getMyEphemeralNodePath(), data);
@@ -1473,7 +1297,7 @@ public class HRegionServer implements Cl
         if (r.shouldFlush()) {
           FlushRequester requester = server.getFlushRequester();
           if (requester != null) {
-            long randomDelay = rand.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
+            long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
             LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString() +
                 " after a delay of " + randomDelay);
             //Throttle the flushes by putting a delay. If we don't throttle, and there
@@ -1494,7 +1318,7 @@ public class HRegionServer implements Cl
    * @return true if online, false if not.
    */
   public boolean isOnline() {
-    return isOnline;
+    return online.get();
   }
 
   /**
@@ -1506,13 +1330,13 @@ public class HRegionServer implements Cl
   private HLog setupWALAndReplication() throws IOException {
     final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
     final String logName
-      = HLogUtil.getHLogDirectoryName(this.serverNameFromMasterPOV.toString());
+      = HLogUtil.getHLogDirectoryName(this.serverName.toString());
 
     Path logdir = new Path(rootDir, logName);
     if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
     if (this.fs.exists(logdir)) {
       throw new RegionServerRunningException("Region server has already " +
-        "created directory at " + this.serverNameFromMasterPOV.toString());
+        "created directory at " + this.serverName.toString());
     }
 
     // Instantiate replication manager if replication enabled.  Pass it the
@@ -1524,11 +1348,11 @@ public class HRegionServer implements Cl
 
   private HLog getMetaWAL() throws IOException {
     if (this.hlogForMeta != null) return this.hlogForMeta;
-    final String logName = HLogUtil.getHLogDirectoryName(this.serverNameFromMasterPOV.toString());
+    final String logName = HLogUtil.getHLogDirectoryName(this.serverName.toString());
     Path logdir = new Path(rootDir, logName);
     if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
     this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(), rootDir, logName,
-      this.conf, getMetaWALActionListeners(), this.serverNameFromMasterPOV.toString());
+      this.conf, getMetaWALActionListeners(), this.serverName.toString());
     return this.hlogForMeta;
   }
 
@@ -1541,7 +1365,7 @@ public class HRegionServer implements Cl
    */
   protected HLog instantiateHLog(Path rootdir, String logName) throws IOException {
     return HLogFactory.createHLog(this.fs.getBackingFs(), rootdir, logName, this.conf,
-      getWALActionListeners(), this.serverNameFromMasterPOV.toString());
+      getWALActionListeners(), this.serverName.toString());
   }
 
   /**
@@ -1581,7 +1405,7 @@ public class HRegionServer implements Cl
     return hlogRoller;
   }
 
-  public MetricsRegionServer getMetrics() {
+  public MetricsRegionServer getRegionServerMetrics() {
     return this.metricsRegionServer;
   }
 
@@ -1605,9 +1429,7 @@ public class HRegionServer implements Cl
    * hosting server. Worker logs the exception and exits.
    */
   private void startServiceThreads() throws IOException {
-    String n = Thread.currentThread().getName();
     // Start executor services
-    this.service = new ExecutorService(getServerName().toShortString());
     this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
       conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
     this.service.startExecutorService(ExecutorType.RS_OPEN_META,
@@ -1623,25 +1445,25 @@ public class HRegionServer implements Cl
     this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS,
       conf.getInt("hbase.regionserver.wal.max.splitters", SplitLogWorker.DEFAULT_MAX_SPLITTERS));
 
-    Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
+    Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), getName() + ".logRoller",
         uncaughtExceptionHandler);
     this.cacheFlusher.start(uncaughtExceptionHandler);
-    Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
+    Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), getName() +
       ".compactionChecker", uncaughtExceptionHandler);
-    Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), n +
+    Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), getName() +
         ".periodicFlusher", uncaughtExceptionHandler);
     if (this.healthCheckChore != null) {
-      Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker",
+      Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), getName() + ".healthChecker",
             uncaughtExceptionHandler);
     }
     if (this.nonceManagerChore != null) {
-      Threads.setDaemonThreadRunning(this.nonceManagerChore.getThread(), n + ".nonceCleaner",
+      Threads.setDaemonThreadRunning(this.nonceManagerChore.getThread(), getName() + ".nonceCleaner",
             uncaughtExceptionHandler);
     }
 
     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
     // an unhandled exception, it will just exit.
-    this.leases.setName(n + ".leaseChecker");
+    this.leases.setName(getName() + ".leaseChecker");
     this.leases.start();
 
     if (this.replicationSourceHandler == this.replicationSinkHandler &&
@@ -1658,7 +1480,7 @@ public class HRegionServer implements Cl
 
     // Start Server.  This service is like leases in that it internally runs
     // a thread.
-    this.rpcServer.start();
+    rpcServices.rpcServer.start();
 
     // Create the log splitting worker and start it
     // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
@@ -1681,7 +1503,7 @@ public class HRegionServer implements Cl
    */
   private int putUpWebUI() throws IOException {
     int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
-				HConstants.DEFAULT_REGIONSERVER_INFOPORT);
+      HConstants.DEFAULT_REGIONSERVER_INFOPORT);
     // -1 is for disabling info server
     if (port < 0) return port;
     String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
@@ -1690,11 +1512,9 @@ public class HRegionServer implements Cl
         false);
     while (true) {
       try {
-        this.infoServer = new InfoServer("regionserver", addr, port, false, this.conf);
-        this.infoServer.addServlet("status", "/rs-status", RSStatusServlet.class);
-        this.infoServer.addServlet("dump", "/dump", RSDumpServlet.class);
-        this.infoServer.setAttribute(REGIONSERVER, this);
-        this.infoServer.setAttribute(REGIONSERVER_CONF, conf);
+        this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
+        infoServer.addServlet("dump", "/dump", getDumpServlet());
+        configureInfoServer();
         this.infoServer.start();
         break;
       } catch (BindException e) {
@@ -1708,7 +1528,10 @@ public class HRegionServer implements Cl
         port++;
       }
     }
-    return this.infoServer.getPort();
+    port = this.infoServer.getPort();
+    conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
+    conf.setInt(HConstants.MASTER_INFO_PORT, port);
+    return port;
   }
 
   /*
@@ -1763,9 +1586,9 @@ public class HRegionServer implements Cl
   @Override
   public void stop(final String msg) {
     try {
-    	if (this.rsHost != null) {
-    		this.rsHost.preStop(msg);
-    	}
+      if (this.rsHost != null) {
+        this.rsHost.preStop(msg);
+      }
       this.stopped = true;
       LOG.info("STOPPED: " + msg);
       // Wakes run() if it is sleeping
@@ -1784,7 +1607,7 @@ public class HRegionServer implements Cl
   @Override
   public void postOpenDeployTasks(final HRegion r, final CatalogTracker ct)
   throws KeeperException, IOException {
-    checkOpen();
+    rpcServices.checkOpen();
     LOG.info("Post open deploy tasks for region=" + r.getRegionNameAsString());
     // Do checks to see if we need to compact (references or too many files)
     for (Store s : r.getStores().values()) {
@@ -1804,11 +1627,10 @@ public class HRegionServer implements Cl
 
     // Update ZK, or META
     if (r.getRegionInfo().isMetaRegion()) {
-      MetaRegionTracker.setMetaLocation(getZooKeeper(),
-          this.serverNameFromMasterPOV);
+      MetaRegionTracker.setMetaLocation(getZooKeeper(), serverName);
     } else {
       MetaEditor.updateRegionLocation(ct, r.getRegionInfo(),
-        this.serverNameFromMasterPOV, openSeqNum);
+        this.serverName, openSeqNum);
     }
     LOG.info("Finished post open deploy task for " + r.getRegionNameAsString());
 
@@ -1816,7 +1638,12 @@ public class HRegionServer implements Cl
 
   @Override
   public RpcServerInterface getRpcServer() {
-    return rpcServer;
+    return rpcServices.rpcServer;
+  }
+
+  @VisibleForTesting
+  public RSRpcServices getRSRpcServices() {
+    return rpcServices;
   }
 
   /**
@@ -1849,11 +1676,11 @@ public class HRegionServer implements Cl
         msg += "\nCause:\n" + StringUtils.stringifyException(cause);
       }
       // Report to the master but only if we have already registered with the master.
-      if (rssStub != null && this.serverNameFromMasterPOV != null) {
+      if (rssStub != null && this.serverName != null) {
         ReportRSFatalErrorRequest.Builder builder =
           ReportRSFatalErrorRequest.newBuilder();
         ServerName sn =
-          ServerName.parseVersionedServerName(this.serverNameFromMasterPOV.getVersionedBytes());
+          ServerName.parseVersionedServerName(this.serverName.getVersionedBytes());
         builder.setServer(ProtobufUtil.toServerName(sn));
         builder.setErrorMessage(msg);
         rssStub.reportRSFatalError(null, builder.build());
@@ -1890,7 +1717,7 @@ public class HRegionServer implements Cl
    * Wait on all threads to finish. Presumption is that all closes and stops
    * have already been called.
    */
-  protected void join() {
+  protected void stopServiceThreads() {
     if (this.nonceManagerChore != null) {
       Threads.shutdown(this.nonceManagerChore.getThread());
     }
@@ -1951,7 +1778,7 @@ public class HRegionServer implements Cl
    * @return master + port, or null if server has been stopped
    */
   private Pair<ServerName, RegionServerStatusService.BlockingInterface>
-  createRegionServerStatusStub() {
+      createRegionServerStatusStub() {
     ServerName sn = null;
     long previousLogTime = 0;
     RegionServerStatusService.BlockingInterface master = null;
@@ -1974,7 +1801,11 @@ public class HRegionServer implements Cl
           continue;
         }
 
-        new InetSocketAddress(sn.getHostname(), sn.getPort());
+        // If we are on the active master, use the shortcut
+        if (this instanceof HMaster && sn.equals(getServerName())) {
+          intf = ((HMaster)this).getMasterRpcServices();
+          break;
+        }
         try {
           BlockingRpcChannel channel =
             this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), operationTimeout);
@@ -2033,11 +1864,11 @@ public class HRegionServer implements Cl
     ServerName masterServerName = p.getFirst();
     if (masterServerName == null) return result;
     try {
-      this.requestCount.set(0);
-      LOG.info("reportForDuty to master=" + masterServerName + " with port=" + this.isa.getPort() +
-        ", startcode=" + this.startcode);
+      rpcServices.requestCount.set(0);
+      LOG.info("reportForDuty to master=" + masterServerName + " with port="
+        + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
       long now = EnvironmentEdgeManager.currentTimeMillis();
-      int port = this.isa.getPort();
+      int port = rpcServices.isa.getPort();
       RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
       request.setPort(port);
       request.setServerStartCode(this.startcode);
@@ -2281,10 +2112,7 @@ public class HRegionServer implements Cl
 
   @Override
   public ServerName getServerName() {
-    // Our servername could change after we talk to the master.
-    return this.serverNameFromMasterPOV == null?
-        ServerName.valueOf(this.isa.getHostName(), this.isa.getPort(), this.startcode) :
-        this.serverNameFromMasterPOV;
+    return serverName;
   }
 
   @Override
@@ -2292,11 +2120,7 @@ public class HRegionServer implements Cl
     return this.compactSplitThread;
   }
 
-  public ZooKeeperWatcher getZooKeeperWatcher() {
-    return this.zooKeeper;
-  }
-
-  public RegionServerCoprocessorHost getCoprocessorHost(){
+  public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
     return this.rsHost;
   }
 
@@ -2372,34 +2196,6 @@ public class HRegionServer implements Cl
   }
 
   /**
-   * @param hrs
-   * @return Thread the RegionServer is running in correctly named.
-   * @throws IOException
-   */
-  public static Thread startRegionServer(final HRegionServer hrs)
-      throws IOException {
-    return startRegionServer(hrs, "regionserver" + hrs.isa.getPort());
-  }
-
-  /**
-   * @param hrs
-   * @param name
-   * @return Thread the RegionServer is running in correctly named.
-   * @throws IOException
-   */
-  public static Thread startRegionServer(final HRegionServer hrs,
-      final String name) throws IOException {
-    Thread t = new Thread(hrs);
-    t.setName(name);
-    t.start();
-    // Install shutdown hook that will catch signals and run an orderly shutdown
-    // of the hrs.
-    ShutdownHook.install(hrs.getConfiguration(), FileSystem.get(hrs
-        .getConfiguration()), hrs, t);
-    return t;
-  }
-
-  /**
    * Utility for constructing an instance of the passed HRegionServer class.
    *
    * @param regionServerClass
@@ -2457,7 +2253,7 @@ public class HRegionServer implements Cl
    }
 
   // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
-  public String[] getCoprocessors() {
+  public String[] getRegionServerCoprocessors() {
     TreeSet<String> coprocessors = new TreeSet<String>(
         this.hlog.getCoprocessorHost().getCoprocessors());
     Collection<HRegion> regions = getOnlineRegionsLocalContext();
@@ -2468,61 +2264,6 @@ public class HRegionServer implements Cl
   }
 
   /**
-   * Instantiated as a scanner lease. If the lease times out, the scanner is
-   * closed
-   */
-  private class ScannerListener implements LeaseListener {
-    private final String scannerName;
-
-    ScannerListener(final String n) {
-      this.scannerName = n;
-    }
-
-    @Override
-    public void leaseExpired() {
-      RegionScannerHolder rsh = scanners.remove(this.scannerName);
-      if (rsh != null) {
-        RegionScanner s = rsh.s;
-        LOG.info("Scanner " + this.scannerName + " lease expired on region "
-            + s.getRegionInfo().getRegionNameAsString());
-        try {
-          HRegion region = getRegion(s.getRegionInfo().getRegionName());
-          if (region != null && region.getCoprocessorHost() != null) {
-            region.getCoprocessorHost().preScannerClose(s);
-          }
-
-          s.close();
-          if (region != null && region.getCoprocessorHost() != null) {
-            region.getCoprocessorHost().postScannerClose(s);
-          }
-        } catch (IOException e) {
-          LOG.error("Closing scanner for "
-              + s.getRegionInfo().getRegionNameAsString(), e);
-        }
-      } else {
-        LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
-            " scanner found, hence no chance to close that related scanner!");
-      }
-    }
-  }
-
-  /**
-   * Called to verify that this server is up and running.
-   *
-   * @throws IOException
-   */
-  protected void checkOpen() throws IOException {
-    if (this.stopped || this.abortRequested) {
-      throw new RegionServerStoppedException("Server " + getServerName() +
-        " not running" + (this.abortRequested ? ", aborting" : ""));
-    }
-    if (!fsOk) {
-      throw new RegionServerStoppedException("File system not available");
-    }
-  }
-
-
-  /**
    * Try to close the region, logs a warning on failure but continues.
    * @param region Region to close
    */
@@ -2713,23 +2454,11 @@ public class HRegionServer implements Cl
    *
    * @param t Throwable
    *
-   * @return Throwable converted to an IOE; methods can only let out IOEs.
-   */
-  protected Throwable cleanup(final Throwable t) {
-    return cleanup(t, null);
-  }
-
-  /*
-   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
-   * IOE if it isn't already.
-   *
-   * @param t Throwable
-   *
    * @param msg Message to log in error. Can be null.
    *
    * @return Throwable converted to an IOE; methods can only let out IOEs.
    */
-  protected Throwable cleanup(final Throwable t, final String msg) {
+  private Throwable cleanup(final Throwable t, final String msg) {
     // Don't log as error if NSRE; NSRE is 'normal' operation.
     if (t instanceof NotServingRegionException) {
       LOG.debug("NotServingRegionException; " + t.getMessage());
@@ -2740,7 +2469,7 @@ public class HRegionServer implements Cl
     } else {
       LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
     }
-    if (!checkOOME(t)) {
+    if (!rpcServices.checkOOME(t)) {
       checkFileSystem();
     }
     return t;
@@ -2758,33 +2487,6 @@ public class HRegionServer implements Cl
         || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
   }
 
-  /*
-   * Check if an OOME and, if so, abort immediately to avoid creating more objects.
-   *
-   * @param e
-   *
-   * @return True if we OOME'd and are aborting.
-   */
-  @Override
-  public boolean checkOOME(final Throwable e) {
-    boolean stop = false;
-    try {
-      if (e instanceof OutOfMemoryError
-          || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
-          || (e.getMessage() != null && e.getMessage().contains(
-              "java.lang.OutOfMemoryError"))) {
-        stop = true;
-        LOG.fatal(
-          "Run out of memory; HRegionServer will abort itself immediately", e);
-      }
-    } finally {
-      if (stop) {
-        Runtime.getRuntime().halt(1);
-      }
-    }
-    return stop;
-  }
-
   /**
    * Checks to see if the file system is still accessible. If not, sets
    * abortRequested and stopRequested
@@ -2803,1642 +2505,63 @@ public class HRegionServer implements Cl
     return this.fsOk;
   }
 
-  protected long addScanner(RegionScanner s, HRegion r) throws LeaseStillHeldException {
-    long scannerId = this.scannerIdGen.incrementAndGet();
-    String scannerName = String.valueOf(scannerId);
-
-    RegionScannerHolder existing =
-      scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
-    assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
-
-    this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
-        new ScannerListener(scannerName));
-
-    return scannerId;
+  @Override
+  public void updateRegionFavoredNodesMapping(String encodedRegionName,
+      List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
+    InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
+    // Refer to the comment on the declaration of regionFavoredNodesMap on why
+    // it is a map of region name to InetSocketAddress[]
+    for (int i = 0; i < favoredNodes.size(); i++) {
+      addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
+          favoredNodes.get(i).getPort());
+    }
+    regionFavoredNodesMap.put(encodedRegionName, addr);
   }
 
-  // Start Client methods
-
   /**
-   * Get data from a table.
-   *
-   * @param controller the RPC controller
-   * @param request the get request
-   * @throws ServiceException
+   * Return the favored nodes for a region given its encoded name. Look at the
+   * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
+   * @param encodedRegionName
+   * @return array of favored locations
    */
   @Override
-  public GetResponse get(final RpcController controller,
-      final GetRequest request) throws ServiceException {
-    long before = EnvironmentEdgeManager.currentTimeMillis();
-    try {
-      checkOpen();
-      requestCount.increment();
-      HRegion region = getRegion(request.getRegion());
-
-      GetResponse.Builder builder = GetResponse.newBuilder();
-      ClientProtos.Get get = request.getGet();
-      Boolean existence = null;
-      Result r = null;
-
-      if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
-        if (get.getColumnCount() != 1) {
-          throw new DoNotRetryIOException(
-            "get ClosestRowBefore supports one and only one family now, not "
-              + get.getColumnCount() + " families");
-        }
-        byte[] row = get.getRow().toByteArray();
-        byte[] family = get.getColumn(0).getFamily().toByteArray();
-        r = region.getClosestRowBefore(row, family);
-      } else {
-        Get clientGet = ProtobufUtil.toGet(get);
-        if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
-          existence = region.getCoprocessorHost().preExists(clientGet);
-        }
-        if (existence == null) {
-          r = region.get(clientGet);
-          if (get.getExistenceOnly()) {
-            boolean exists = r.getExists();
-            if (region.getCoprocessorHost() != null) {
-              exists = region.getCoprocessorHost().postExists(clientGet, exists);
-            }
-            existence = exists;
-          }
-        }
-      }
-      if (existence != null){
-        ClientProtos.Result pbr = ProtobufUtil.toResult(existence);
-        builder.setResult(pbr);
-      } else  if (r != null) {
-        ClientProtos.Result pbr = ProtobufUtil.toResult(r);
-        builder.setResult(pbr);
-      }
-      return builder.build();
-    } catch (IOException ie) {
-      throw new ServiceException(ie);
-    } finally {
-      metricsRegionServer.updateGet(EnvironmentEdgeManager.currentTimeMillis() - before);
-    }
+  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
+    return regionFavoredNodesMap.get(encodedRegionName);
   }
 
-
-  /**
-   * Mutate data in a table.
-   *
-   * @param rpcc the RPC controller
-   * @param request the mutate request
-   * @throws ServiceException
-   */
   @Override
-  public MutateResponse mutate(final RpcController rpcc,
-      final MutateRequest request) throws ServiceException {
-    // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
-    // It is also the conduit via which we pass back data.
-    PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
-    CellScanner cellScanner = controller != null? controller.cellScanner(): null;
-    // Clear scanner so we are not holding on to reference across call.
-    if (controller != null) controller.setCellScanner(null);
-    try {
-      checkOpen();
-      requestCount.increment();
-      HRegion region = getRegion(request.getRegion());
-      MutateResponse.Builder builder = MutateResponse.newBuilder();
-      MutationProto mutation = request.getMutation();
-      if (!region.getRegionInfo().isMetaTable()) {
-        cacheFlusher.reclaimMemStoreMemory();
-      }
-      long nonceGroup = request.hasNonceGroup()
-          ? request.getNonceGroup() : HConstants.NO_NONCE;
-      Result r = null;
-      Boolean processed = null;
-      MutationType type = mutation.getMutateType();
-      switch (type) {
-      case APPEND:
-        // TODO: this doesn't actually check anything.
-        r = append(region, mutation, cellScanner, nonceGroup);
-        break;
-      case INCREMENT:
-        // TODO: this doesn't actually check anything.
-        r = increment(region, mutation, cellScanner, nonceGroup);
-        break;
-      case PUT:
-        Put put = ProtobufUtil.toPut(mutation, cellScanner);
-        if (request.hasCondition()) {
-          Condition condition = request.getCondition();
-          byte[] row = condition.getRow().toByteArray();
-          byte[] family = condition.getFamily().toByteArray();
-          byte[] qualifier = condition.getQualifier().toByteArray();
-          CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
-          ByteArrayComparable comparator =
-            ProtobufUtil.toComparator(condition.getComparator());
-          if (region.getCoprocessorHost() != null) {
-            processed = region.getCoprocessorHost().preCheckAndPut(
-              row, family, qualifier, compareOp, comparator, put);
-          }
-          if (processed == null) {
-            boolean result = region.checkAndMutate(row, family,
-              qualifier, compareOp, comparator, put, true);
-            if (region.getCoprocessorHost() != null) {
-              result = region.getCoprocessorHost().postCheckAndPut(row, family,
-                qualifier, compareOp, comparator, put, result);
-            }
-            processed = result;
-          }
-        } else {
-          region.put(put);
-          processed = Boolean.TRUE;
-        }
-        break;
-      case DELETE:
-        Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
-        if (request.hasCondition()) {
-          Condition condition = request.getCondition();
-          byte[] row = condition.getRow().toByteArray();
-          byte[] family = condition.getFamily().toByteArray();
-          byte[] qualifier = condition.getQualifier().toByteArray();
-          CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
-          ByteArrayComparable comparator =
-            ProtobufUtil.toComparator(condition.getComparator());
-          if (region.getCoprocessorHost() != null) {
-            processed = region.getCoprocessorHost().preCheckAndDelete(
-              row, family, qualifier, compareOp, comparator, delete);
-          }
-          if (processed == null) {
-            boolean result = region.checkAndMutate(row, family,
-              qualifier, compareOp, comparator, delete, true);
-            if (region.getCoprocessorHost() != null) {
-              result = region.getCoprocessorHost().postCheckAndDelete(row, family,
-                qualifier, compareOp, comparator, delete, result);
-            }
-            processed = result;
-          }
-        } else {
-          region.delete(delete);
-          processed = Boolean.TRUE;
-        }
-        break;
-        default:
-          throw new DoNotRetryIOException(
-            "Unsupported mutate type: " + type.name());
-      }
-      if (processed != null) builder.setProcessed(processed.booleanValue());
-      addResult(builder, r, controller);
-      return builder.build();
-    } catch (IOException ie) {
-      checkFileSystem();
-      throw new ServiceException(ie);
-    }
+  public ServerNonceManager getNonceManager() {
+    return this.nonceManager;
   }
 
+  private static class MovedRegionInfo {
+    private final ServerName serverName;
+    private final long seqNum;
+    private final long ts;
 
-  /**
-   * @return True if current call supports cellblocks
-   */
-  private boolean isClientCellBlockSupport() {
-    RpcCallContext context = RpcServer.getCurrentCall();
-    return context != null && context.isClientCellBlockSupport();
-  }
+    public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
+      this.serverName = serverName;
+      this.seqNum = closeSeqNum;
+      ts = EnvironmentEdgeManager.currentTimeMillis();
+     }
 
-  private void addResult(final MutateResponse.Builder builder,
-      final Result result, final PayloadCarryingRpcController rpcc) {
-    if (result == null) return;
-    if (isClientCellBlockSupport()) {
-      builder.setResult(ProtobufUtil.toResultNoData(result));
-      rpcc.setCellScanner(result.cellScanner());
-    } else {
-      ClientProtos.Result pbr = ProtobufUtil.toResult(result);
-      builder.setResult(pbr);
+    public ServerName getServerName() {
+      return serverName;
     }
-  }
 
-  //
-  // remote scanner interface
-  //
+    public long getSeqNum() {
+      return seqNum;
+    }
 
-  /**
-   * Scan data in a table.
-   *
-   * @param controller the RPC controller
-   * @param request the scan request
-   * @throws ServiceException
-   */
-  @Override
-  public ScanResponse scan(final RpcController controller, final ScanRequest request)
-  throws ServiceException {
-    Leases.Lease lease = null;
-    String scannerName = null;
-    try {
-      if (!request.hasScannerId() && !request.hasScan()) {
-        throw new DoNotRetryIOException(
-          "Missing required input: scannerId or scan");
-      }
-      long scannerId = -1;
-      if (request.hasScannerId()) {
-        scannerId = request.getScannerId();
-        scannerName = String.valueOf(scannerId);
-      }
-      try {
-        checkOpen();
-      } catch (IOException e) {
-        // If checkOpen failed, server not running or filesystem gone,
-        // cancel this lease; filesystem is gone or we're closing or something.
-        if (scannerName != null) {
-          try {
-            leases.cancelLease(scannerName);
-          } catch (LeaseException le) {
-            LOG.info("Server shutting down and client tried to access missing scanner " +
-              scannerName);
-          }
-        }
-        throw e;
-      }
-      requestCount.increment();
+    public long getMoveTime() {
+      return ts;
+    }
+  }
 
-      int ttl = 0;
-      HRegion region = null;
-      RegionScanner scanner = null;
-      RegionScannerHolder rsh = null;
-      boolean moreResults = true;
-      boolean closeScanner = false;
-      ScanResponse.Builder builder = ScanResponse.newBuilder();
-      if (request.hasCloseScanner()) {
-        closeScanner = request.getCloseScanner();
-      }
-      int rows = 1;
-      if (request.hasNumberOfRows()) {
-        rows = request.getNumberOfRows();
-      }
-      if (request.hasScannerId()) {
-        rsh = scanners.get(scannerName);
-        if (rsh == null) {
-          LOG.info("Client tried to access missing scanner " + scannerName);
-          throw new UnknownScannerException(
-            "Name: " + scannerName + ", already closed?");
-        }
-        scanner = rsh.s;
-        HRegionInfo hri = scanner.getRegionInfo();
-        region = getRegion(hri.getRegionName());
-        if (region != rsh.r) { // Yes, should be the same instance
-          throw new NotServingRegionException("Region was re-opened after the scanner"
-            + scannerName + " was created: " + hri.getRegionNameAsString());
-        }
-      } else {
-        region = getRegion(request.getRegion());
-        ClientProtos.Scan protoScan = request.getScan();
-        boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
-        Scan scan = ProtobufUtil.toScan(protoScan);
-        // if the request doesn't set this, get the default region setting.
-        if (!isLoadingCfsOnDemandSet) {
-          scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
-        }
-        scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
-        region.prepareScanner(scan);
-        if (region.getCoprocessorHost() != null) {
-          scanner = region.getCoprocessorHost().preScannerOpen(scan);
-        }
-        if (scanner == null) {
-          scanner = region.getScanner(scan);
-        }
-        if (region.getCoprocessorHost() != null) {
-          scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
-        }
-        scannerId = addScanner(scanner, region);
-        scannerName = String.valueOf(scannerId);
-        ttl = this.scannerLeaseTimeoutPeriod;
-      }
-
-      if (rows > 0) {
-        // if nextCallSeq does not match throw Exception straight away. This needs to be
-        // performed even before checking of Lease.
-        // See HBASE-5974
-        if (request.hasNextCallSeq()) {
-          if (rsh == null) {
-            rsh = scanners.get(scannerName);
-          }
-          if (rsh != null) {
-            if (request.getNextCallSeq() != rsh.nextCallSeq) {
-              throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
-                + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
-                "; request=" + TextFormat.shortDebugString(request));
-            }
-            // Increment the nextCallSeq value which is the next expected from client.
-            rsh.nextCallSeq++;
-          }
-        }
-        try {
-          // Remove lease while its being processed in server; protects against case
-          // where processing of request takes > lease expiration time.
-          lease = leases.removeLease(scannerName);
-          List<Result> results = new ArrayList<Result>(rows);
-          long currentScanResultSize = 0;
-
-          boolean done = false;
-          // Call coprocessor. Get region info from scanner.
-          if (region != null && region.getCoprocessorHost() != null) {
-            Boolean bypass = region.getCoprocessorHost().preScannerNext(
-              scanner, results, rows);
-            if (!results.isEmpty()) {
-              for (Result r : results) {
-                if (maxScannerResultSize < Long.MAX_VALUE){
-                  for (Cell kv : r.rawCells()) {
-                    // TODO
-                    currentScanResultSize += KeyValueUtil.ensureKeyValue(kv).heapSize();
-                  }
-                }
-              }
-            }
-            if (bypass != null && bypass.booleanValue()) {
-              done = true;
-            }
-          }
-
-          if (!done) {
-            long maxResultSize = scanner.getMaxResultSize();
-            if (maxResultSize <= 0) {
-              maxResultSize = maxScannerResultSize;
-            }
-            List<Cell> values = new ArrayList<Cell>();
-            region.startRegionOperation(Operation.SCAN);
-            try {
-              int i = 0;
-              synchronized(scanner) {
-                for (; i < rows
-                    && currentScanResultSize < maxResultSize; ) {
-                  // Collect values to be returned here
-                  boolean moreRows = scanner.nextRaw(values);
-                  if (!values.isEmpty()) {
-                    if (maxScannerResultSize < Long.MAX_VALUE){
-                      for (Cell kv : values) {
-                        currentScanResultSize += KeyValueUtil.ensureKeyValue(kv).heapSize();
-                      }
-                    }
-                    results.add(Result.create(values));
-                    i++;
-                  }
-                  if (!moreRows) {
-                    break;
-                  }
-                  values.clear();
-                }
-              }
-              region.readRequestsCount.add(i);
-            } finally {
-              region.closeRegionOperation();
-            }
-
-            // coprocessor postNext hook
-            if (region != null && region.getCoprocessorHost() != null) {
-              region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
-            }
-          }
-
-          // If the scanner's filter - if any - is done with the scan
-          // and wants to tell the client to stop the scan. This is done by passing
-          // a null result, and setting moreResults to false.
-          if (scanner.isFilterDone() && results.isEmpty()) {
-            moreResults = false;
-            results = null;
-          } else {
-            addResults(builder, results, controller);
-          }
-        } finally {
-          // We're done. On way out re-add the above removed lease.
-          // Adding resets expiration time on lease.
-          if (scanners.containsKey(scannerName)) {
-            if (lease != null) leases.addLease(lease);
-            ttl = this.scannerLeaseTimeoutPeriod;
-          }
-        }
-      }
-
-      if (!moreResults || closeScanner) {
-        ttl = 0;
-        moreResults = false;
-        if (region != null && region.getCoprocessorHost() != null) {
-          if (region.getCoprocessorHost().preScannerClose(scanner)) {
-            return builder.build(); // bypass
-          }
-        }
-        rsh = scanners.remove(scannerName);
-        if (rsh != null) {
-          scanner = rsh.s;
-          scanner.close();
-          leases.cancelLease(scannerName);
-          if (region != null && region.getCoprocessorHost() != null) {
-            region.getCoprocessorHost().postScannerClose(scanner);
-          }
-        }
-      }
-
-      if (ttl > 0) {
-        builder.setTtl(ttl);
-      }
-      builder.setScannerId(scannerId);
-      builder.setMoreResults(moreResults);
-      return builder.build();
-    } catch (IOException ie) {
-      if (scannerName != null && ie instanceof NotServingRegionException) {
-        RegionScannerHolder rsh = scanners.remove(scannerName);
-        if (rsh != null) {
-          try {
-            RegionScanner scanner = rsh.s;
-            scanner.close();
-            leases.cancelLease(scannerName);
-          } catch (IOException e) {}
-        }
-      }
-      throw new ServiceException(ie);
-    }
-  }
-
-  private void addResults(final ScanResponse.Builder builder, final List<Result> results,
-      final RpcController controller) {
-    if (results == null || results.isEmpty()) return;
-    if (isClientCellBlockSupport()) {
-      for (Result res : results) {
-        builder.addCellsPerResult(res.size());
-      }
-      ((PayloadCarryingRpcController)controller).
-        setCellScanner(CellUtil.createCellScanner(results));
-    } else {
-      for (Result res: results) {
-        ClientProtos.Result pbr = ProtobufUtil.toResult(res);
-        builder.addResults(pbr);
-      }
-    }
-  }
-
-  /**
-   * Atomically bulk load several HFiles into an open region
-   * @return true if successful, false is failed but recoverably (no action)
-   * @throws IOException if failed unrecoverably
-   */
-  @Override
-  public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
-      final BulkLoadHFileRequest request) throws ServiceException {
-    try {
-      checkOpen();
-      requestCount.increment();
-      HRegion region = getRegion(request.getRegion());
-      List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
-      for (FamilyPath familyPath: request.getFamilyPathList()) {
-        familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
-          familyPath.getPath()));
-      }
-      boolean bypass = false;
-      if (region.getCoprocessorHost() != null) {
-        bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
-      }
-      boolean loaded = false;
-      if (!bypass) {
-        loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum());
-      }
-      if (region.getCoprocessorHost() != null) {
-        loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
-      }
-      BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
-      builder.setLoaded(loaded);
-      return builder.build();
-    } catch (IOException ie) {
-      throw new ServiceException(ie);
-    }
-  }
-
-  @Override
-  public CoprocessorServiceResponse execService(final RpcController controller,
-      final CoprocessorServiceRequest request) throws ServiceException {
-    try {
-      checkOpen();
-      requestCount.increment();
-      HRegion region = getRegion(request.getRegion());
-      Message result = execServiceOnRegion(region, request.getCall());
-      CoprocessorServiceResponse.Builder builder =
-          CoprocessorServiceResponse.newBuilder();
-      builder.setRegion(RequestConverter.buildRegionSpecifier(
-          RegionSpecifierType.REGION_NAME, region.getRegionName()));
-      builder.setValue(
-          builder.getValueBuilder().setName(result.getClass().getName())
-              .setValue(result.toByteString()));
-      return builder.build();
-    } catch (IOException ie) {
-      throw new ServiceException(ie);
-    }
-  }
-
-  private Message execServiceOnRegion(HRegion region,
-      final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
-    // ignore the passed in controller (from the serialized call)
-    ServerRpcController execController = new ServerRpcController();
-    Message result = region.execService(execController, serviceCall);
-    if (execController.getFailedOn() != null) {
-      throw execController.getFailedOn();
-    }
-    return result;
-  }
-
-  /**
-   * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
-   *
-   * @param rpcc the RPC controller
-   * @param request the multi request
-   * @throws ServiceException
-   */
-  @Override
-  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
-  throws ServiceException {
-    try {
-      checkOpen();
-    } catch (IOException ie) {
-      throw new ServiceException(ie);
-    }
-
-    // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
-    // It is also the conduit via which we pass back data.
-    PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
-    CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
-    if (controller != null) controller.setCellScanner(null);
-
-    long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
-
-    // this will contain all the cells that we need to return. It's created later, if needed.
-    List<CellScannable> cellsToReturn = null;
-    MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
-    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
-
-    for (RegionAction regionAction : request.getRegionActionList()) {
-      this.requestCount.add(regionAction.getActionCount());
-      HRegion region;
-      regionActionResultBuilder.clear();
-      try {
-        region = getRegion(regionAction.getRegion());
-      } catch (IOException e) {
-        regionActionResultBuilder.setException(ResponseConverter.buildException(e));
-        responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
-        continue;  // For this region it's a failure.
-      }
-
-      if (regionAction.hasAtomic() && regionAction.getAtomic()) {
-        // How does this call happen?  It may need some work to play well w/ the surroundings.
-        // Need to return an item per Action along w/ Action index.  TODO.
-        try {
-          mutateRows(region, regionAction.getActionList(), cellScanner);
-        } catch (IOException e) {
-          // As it's atomic, we may expect it's a global failure.
-          regionActionResultBuilder.setException(ResponseConverter.buildException(e));
-        }
-      } else {
-        // doNonAtomicRegionMutation manages the exception internally
-        cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,
-            regionActionResultBuilder, cellsToReturn, nonceGroup);
-      }
-      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
-    }
-    // Load the controller with the Cells to return.
-    if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
-      controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
-    }
-    return responseBuilder.build();
-  }
-
-  /**
-   * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
-   * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
-   * @param region
-   * @param actions
-   * @param cellScanner
-   * @param builder
-   * @param cellsToReturn  Could be null. May be allocated in this method.  This is what this
-   * method returns as a 'result'.
-   * @return Return the <code>cellScanner</code> passed
-   */
-  private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
-      final RegionAction actions, final CellScanner cellScanner,
-      final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) {
-    // Gather up CONTIGUOUS Puts and Deletes in this mutations List.  Idea is that rather than do
-    // one at a time, we instead pass them in batch.  Be aware that the corresponding
-    // ResultOrException instance that matches each Put or Delete is then added down in the
-    // doBatchOp call.  We should be staying aligned though the Put and Delete are deferred/batched
-    List<ClientProtos.Action> mutations = null;
-    for (ClientProtos.Action action: actions.getActionList()) {
-      ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
-      try {
-        Result r = null;
-        if (action.hasGet()) {
-          Get get = ProtobufUtil.toGet(action.getGet());
-          r = region.get(get);
-        } else if (action.hasServiceCall()) {
-          resultOrExceptionBuilder = ResultOrException.newBuilder();
-          try {
-            Message result = execServiceOnRegion(region, action.getServiceCall());
-            ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
-                ClientProtos.CoprocessorServiceResult.newBuilder();
-            resultOrExceptionBuilder.setServiceResult(
-                serviceResultBuilder.setValue(
-                  serviceResultBuilder.getValueBuilder()
-                    .setName(result.getClass().getName())
-                    .setValue(result.toByteString())));
-          } catch (IOException ioe) {
-            resultOrExceptionBuilder.setException(ResponseConverter.buildException(ioe));
-          }
-        } else if (action.hasMutation()) {
-          MutationType type = action.getMutation().getMutateType();
-          if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
-              !mutations.isEmpty()) {
-            // Flush out any Puts or Deletes already collected.
-            doBatchOp(builder, region, mutations, cellScanner);
-            mutations.clear();
-          }
-          switch (type) {
-          case APPEND:
-            r = append(region, action.getMutation(), cellScanner, nonceGroup);
-            break;
-          case INCREMENT:
-            r = increment(region, action.getMutation(), cellScanner,  nonceGroup);
-            break;
-          case PUT:
-          case DELETE:
-            // Collect the individual mutations and apply in a batch
-            if (mutations == null) {
-              mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
-            }
-            mutations.add(action);
-            break;
-          default:
-            throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
-          }
-        } else {
-          throw new HBaseIOException("Unexpected Action type");
-        }
-        if (r != null) {
-          ClientProtos.Result pbResult = null;
-          if (isClientCellBlockSupport()) {
-            pbResult = ProtobufUtil.toResultNoData(r);
-            //  Hard to guess the size here.  Just make a rough guess.
-            if (cellsToReturn == null) cellsToReturn = new ArrayList<CellScannable>();
-            cellsToReturn.add(r);
-          } else {
-            pbResult = ProtobufUtil.toResult(r);
-          }
-          resultOrExceptionBuilder =
-            ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
-        }
-        // Could get to here and there was no result and no exception.  Presumes we added

[... 1014 lines stripped ...]


Mime
View raw message