hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [49/51] [partial] hbase git commit: HBASE-16264 Figure how to deal with endpoints and shaded pb Shade our protobufs. Do it in a manner that makes it so we can still have in our API references to com.google.protobuf (and in REST). The c.g.p in API is for
Date Thu, 29 Sep 2016 19:38:03 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
index 6a02e18..b97743f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
@@ -21,20 +21,22 @@ package org.apache.hadoop.hbase.client;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotEnabledException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
-
-import com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Implementations make an rpc call against a RegionService via a protobuf Service.
- * Implement rpcCall(). Be sure to make use of the RpcController that this instance is carrying
- * via {@link #getRpcController()}.
+ * Implementations make a RPC call against a RegionService via a protobuf Service.
+ * Implement rpcCall() and the parent class setClientByServiceName; this latter is where the
+ * RPC stub gets set (the appropriate protobuf 'Service'/Client). Be sure to make use of the
+ * RpcController that this instance is carrying via #getRpcController().
  *
  * <p>TODO: this class is actually tied to one region, because most of the paths make use of
  *       the regioninfo part of location when building requests. The only reason it works for
@@ -42,74 +44,75 @@ import com.google.protobuf.RpcController;
  *       This could be done cleaner (e.g. having a generic parameter and 2 derived classes,
  *       RegionCallable and actual RegionServerCallable with ServerName.
  *
- * @param <T> the class that the ServerCallable handles
+ * @param <T> The class that the ServerCallable handles.
+ * @param <S> The protocol to use (Admin or Client or even an Endpoint over in MetaTableAccessor).
  */
+// TODO: MasterCallable and this Class have a lot in common. UNIFY!
+// Public but should be package private only it is used by MetaTableAccessor. FIX!!
 @InterfaceAudience.Private
-public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> {
-  private ClientService.BlockingInterface stub;
-
-  /* This is 99% of the time a PayloadCarryingRpcController but this RegionServerCallable is
-   * also used doing Coprocessor Endpoints and in this case, it is a ServerRpcControllable which is
-   * not a PayloadCarryingRpcController. Too hard to untangle it all at this stage since
-   * downstreamers are using RegionServerCallable invoking CPEPs so just do ugly instanceof
-   * checks in the below.
+public abstract class RegionServerCallable<T, S> implements RetryingCallable<T> {
+  private final Connection connection;
+  private final TableName tableName;
+  private final byte[] row;
+  /**
+   * Some subclasses want to set their own location. Make it protected.
    */
-  private final RpcController rpcController;
+  protected HRegionLocation location;
+  protected final static int MIN_WAIT_DEAD_SERVER = 10000;
+  protected S stub;
+
+  /**
+   * This is 99% of the time a HBaseRpcController but also used doing Coprocessor Endpoints and in
+   * this case, it is a ServerRpcControllable which is not a HBaseRpcController.
+   * Can be null!
+   */
+  protected final RpcController rpcController;
 
   /**
    * @param connection Connection to use.
+   * @param rpcController Controller to use; can be shaded or non-shaded.
    * @param tableName Table name to which <code>row</code> belongs.
    * @param row The row we want in <code>tableName</code>.
    */
-  public RegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory,
-      TableName tableName, byte [] row) {
-    this(connection, rpcControllerFactory.newController(), tableName, row);
-  }
-
-  public RegionServerCallable(Connection connection, RpcController rpcController,
-      TableName tableName, byte [] row) {
-    super(connection, tableName, row);
+  public RegionServerCallable(Connection connection, TableName tableName, byte [] row,
+      RpcController rpcController) {
+    super();
+    this.connection = connection;
+    this.tableName = tableName;
+    this.row = row;
     this.rpcController = rpcController;
   }
 
-  void setClientByServiceName(ServerName service) throws IOException {
-    this.setStub(getConnection().getClient(service));
+  protected RpcController getRpcController() {
+    return this.rpcController;
   }
 
-  /**
-   * @return Client Rpc protobuf communication stub
-   */
-  protected ClientService.BlockingInterface getStub() {
-    return this.stub;
+  protected void setStub(S stub) {
+    this.stub = stub;
   }
 
-  /**
-   * Set the client protobuf communication stub
-   * @param stub to set
-   */
-  void setStub(final ClientService.BlockingInterface stub) {
-    this.stub = stub;
+  protected S getStub() {
+    return this.stub;
   }
 
   /**
-   * Override that changes call Exception from {@link Exception} to {@link IOException}. It also
-   * does setup of an rpcController and calls through to the unimplemented
-   * rpcCall() method. If rpcController is an instance of PayloadCarryingRpcController,
-   * we will set a timeout on it.
+   * Override that changes call Exception from {@link Exception} to {@link IOException}.
+   * Also does set up of the rpcController.
    */
-  @Override
   public T call(int callTimeout) throws IOException {
     try {
-      if (this.rpcController != null) {
+      // Iff non-null and an instance of a SHADED rpcController, do config! Unshaded -- i.e.
+      // com.google.protobuf.RpcController or null -- will just skip over this config.
+      if (getRpcController() != null) {
+        RpcController shadedRpcController = (RpcController)getRpcController();
         // Do a reset to clear previous states, such as CellScanner.
-        this.rpcController.reset();
-        if (this.rpcController instanceof HBaseRpcController) {
-          HBaseRpcController pcrc = (HBaseRpcController)this.rpcController;
-          // If it is an instance of PayloadCarryingRpcController, we can set priority on the
-          // controller based off the tableName. RpcController may be null in tests when mocking so allow
-          // for null controller.
-          pcrc.setPriority(tableName);
-          pcrc.setCallTimeout(callTimeout);
+        shadedRpcController.reset();
+        if (shadedRpcController instanceof HBaseRpcController) {
+          HBaseRpcController hrc = (HBaseRpcController)getRpcController();
+          // If it is an instance of HBaseRpcController, we can set priority on the controller based
+          // off the tableName. Set call timeout too.
+          hrc.setPriority(tableName);
+          hrc.setCallTimeout(callTimeout);
         }
       }
       return rpcCall();
@@ -128,23 +131,98 @@ public abstract class RegionServerCallable<T> extends AbstractRegionServerCallab
    */
   protected abstract T rpcCall() throws Exception;
 
-  protected RpcController getRpcController() {
-    return this.rpcController;
-  }
-
   /**
    * Get the RpcController CellScanner.
-   * If the RpcController is a PayloadCarryingRpcController, which it is in all cases except
+   * If the RpcController is a HBaseRpcController, which it is in all cases except
    * when we are processing Coprocessor Endpoint, then this method returns a reference to the
-   * CellScanner that the PayloadCarryingRpcController is carrying. Do it up here in this Callable
-   * so we don't have to scatter ugly instanceof tests around the codebase. Will fail if called in
-   * a Coproccessor Endpoint context. Should never happen.
+   * CellScanner that the HBaseRpcController is carrying. Do it up here in this Callable
+   * so we don't have to scatter ugly instanceof tests around the codebase. Will return null
+   * if called in a Coproccessor Endpoint context. Should never happen.
    */
   protected CellScanner getRpcControllerCellScanner() {
-    return ((HBaseRpcController)this.rpcController).cellScanner();
+    return (getRpcController() != null && getRpcController() instanceof HBaseRpcController)?
+        ((HBaseRpcController)getRpcController()).cellScanner(): null;
   }
 
   protected void setRpcControllerCellScanner(CellScanner cellScanner) {
-    ((HBaseRpcController)this.rpcController).setCellScanner(cellScanner);
+    if (getRpcController() != null && getRpcController() instanceof HBaseRpcController) {
+      ((HBaseRpcController)this.rpcController).setCellScanner(cellScanner);
+    }
+  }
+
+  /**
+   * @return {@link ClusterConnection} instance used by this Callable.
+   */
+  protected ClusterConnection getConnection() {
+    return (ClusterConnection) this.connection;
+  }
+
+  protected HRegionLocation getLocation() {
+    return this.location;
   }
+
+  protected void setLocation(final HRegionLocation location) {
+    this.location = location;
+  }
+
+  public TableName getTableName() {
+    return this.tableName;
+  }
+
+  public byte [] getRow() {
+    return this.row;
+  }
+
+  public void throwable(Throwable t, boolean retrying) {
+    if (location != null) {
+      getConnection().updateCachedLocations(tableName, location.getRegionInfo().getRegionName(),
+          row, t, location.getServerName());
+    }
+  }
+
+  public String getExceptionMessageAdditionalDetail() {
+    return "row '" + Bytes.toString(row) + "' on table '" + tableName + "' at " + location;
+  }
+
+  public long sleep(long pause, int tries) {
+    long sleep = ConnectionUtils.getPauseTime(pause, tries);
+    if (sleep < MIN_WAIT_DEAD_SERVER
+        && (location == null || getConnection().isDeadServer(location.getServerName()))) {
+      sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
+    }
+    return sleep;
+  }
+
+  /**
+   * @return the HRegionInfo for the current region
+   */
+  public HRegionInfo getHRegionInfo() {
+    if (this.location == null) {
+      return null;
+    }
+    return this.location.getRegionInfo();
+  }
+
+  public void prepare(final boolean reload) throws IOException {
+    // check table state if this is a retry
+    if (reload && !tableName.equals(TableName.META_TABLE_NAME) &&
+        getConnection().isTableDisabled(tableName)) {
+      throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
+    }
+    try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
+      this.location = regionLocator.getRegionLocation(row);
+    }
+    if (this.location == null) {
+      throw new IOException("Failed to find location, tableName=" + tableName +
+          ", row=" + Bytes.toString(row) + ", reload=" + reload);
+    }
+    setStubByServiceName(this.location.getServerName());
+  }
+
+  /**
+   * Set the RCP client stub
+   * @param serviceName to get the rpc stub for
+   * @throws IOException When client could not be created
+   */
+  protected abstract void setStubByServiceName(ServerName serviceName) throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
index 98792e7..9220f12 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
index afbcc9a..029ee9b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
@@ -19,15 +19,49 @@
 
 package org.apache.hadoop.hbase.client;
 
+import java.io.IOException;
+
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
 /**
- * A Callable&lt;T&gt; that will be retried.  If {@link #call(int)} invocation throws exceptions,
+ * A Callable&lt;T&gt; that will be retried. If {@link #call(int)} invocation throws exceptions,
  * we will call {@link #throwable(Throwable, boolean)} with whatever the exception was.
  * @param <T> result class from executing <tt>this</tt>
  */
 @InterfaceAudience.Private
-public interface RetryingCallable<T> extends RetryingCallableBase {
+public interface RetryingCallable<T> {
+  /**
+   * Prepare by setting up any connections to servers, etc., ahead of call invocation.
+   * TODO: We call prepare before EVERY call. Seems wrong. FIX!!!!
+   * @param reload Set this to true if need to requery locations
+   * @throws IOException e
+   */
+  void prepare(final boolean reload) throws IOException;
+
+  /**
+   * Called when call throws an exception and we are going to retry; take action to
+   * make it so we succeed on next call (clear caches, do relookup of locations, etc.).
+   * @param t        throwable which was thrown
+   * @param retrying True if we are in retrying mode (we are not in retrying mode when max
+   *                 retries == 1; we ARE in retrying mode if retries &gt; 1 even when we are the
+   *                 last attempt)
+   */
+  void throwable(final Throwable t, boolean retrying);
+
+  /**
+   * @return Some details from the implementation that we would like to add to a terminating
+   *         exception; i.e. a fatal exception is being thrown ending retries and we might like to
+   *         add more implementation-specific detail on to the exception being thrown.
+   */
+  String getExceptionMessageAdditionalDetail();
+
+  /**
+   * @param pause time to pause
+   * @param tries amount of tries until till sleep
+   * @return Suggestion on how much to sleep between retries
+   */
+  long sleep(final long pause, final int tries);
+
   /**
    * Computes a result, or throws an exception if unable to do so.
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallableBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallableBase.java
deleted file mode 100644
index 483f6c2..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallableBase.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * All generic methods for a Callable that can be retried. It is extended with Sync and
- * Async versions.
- */
-@InterfaceAudience.Private
-public interface RetryingCallableBase {
-  /**
-   * Prepare by setting up any connections to servers, etc., ahead of call invocation.
-   * @param reload Set this to true if need to requery locations
-   * @throws IOException e
-   */
-  void prepare(final boolean reload) throws IOException;
-
-  /**
-   * Called when call throws an exception and we are going to retry; take action to
-   * make it so we succeed on next call (clear caches, do relookup of locations, etc.).
-   * @param t        throwable which was thrown
-   * @param retrying True if we are in retrying mode (we are not in retrying mode when max
-   *                 retries == 1; we ARE in retrying mode if retries &gt; 1 even when we are the
-   *                 last attempt)
-   */
-  void throwable(final Throwable t, boolean retrying);
-
-  /**
-   * @return Some details from the implementation that we would like to add to a terminating
-   *         exception; i.e. a fatal exception is being thrown ending retries and we might like to
-   *         add more implementation-specific detail on to the exception being thrown.
-   */
-  String getExceptionMessageAdditionalDetail();
-
-  /**
-   * @param pause time to pause
-   * @param tries amount of tries until till sleep
-   * @return Suggestion on how much to sleep between retries
-   */
-  long sleep(final long pause, final int tries);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java
index b0ba9f5..985d938 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java
@@ -49,8 +49,7 @@ abstract class RetryingCallerInterceptorContext {
    * @return A new {@link RetryingCallerInterceptorContext} object that can be
    *         used for use in the current retrying call
    */
-  public abstract RetryingCallerInterceptorContext prepare(
-      RetryingCallableBase callable);
+  public abstract RetryingCallerInterceptorContext prepare(RetryingCallable<?> callable);
 
   /**
    * Telescopic extension that takes which of the many retries we are currently
@@ -64,6 +63,5 @@ abstract class RetryingCallerInterceptorContext {
    * @return A new context object that can be used for use in the current
    *         retrying call
    */
-  public abstract RetryingCallerInterceptorContext prepare(
-      RetryingCallableBase callable, int tries);
-}
+  public abstract RetryingCallerInterceptorContext prepare(RetryingCallable<?> callable, int tries);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
index a5bebd0..c7d78c6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
@@ -103,23 +103,23 @@ public class ReversedScannerCallable extends ScannerCallable {
       if (locateStartRow == null) {
         // Just locate the region with the row
         RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(reload, id,
-            getConnection(), tableName, row);
+            getConnection(), getTableName(), getRow());
         this.location = id < rl.size() ? rl.getRegionLocation(id) : null;
         if (this.location == null) {
           throw new IOException("Failed to find location, tableName="
-              + tableName + ", row=" + Bytes.toStringBinary(row) + ", reload="
+              + getTableName() + ", row=" + Bytes.toStringBinary(getRow()) + ", reload="
               + reload);
         }
       } else {
         // Need to locate the regions with the range, and the target location is
         // the last one which is the previous region of last region scanner
         List<HRegionLocation> locatedRegions = locateRegionsInRange(
-            locateStartRow, row, reload);
+            locateStartRow, getRow(), reload);
         if (locatedRegions.isEmpty()) {
           throw new DoNotRetryIOException(
               "Does hbase:meta exist hole? Couldn't get regions for the range from "
                   + Bytes.toStringBinary(locateStartRow) + " to "
-                  + Bytes.toStringBinary(row));
+                  + Bytes.toStringBinary(getRow()));
         }
         this.location = locatedRegions.get(locatedRegions.size() - 1);
       }
@@ -159,7 +159,7 @@ public class ReversedScannerCallable extends ScannerCallable {
     byte[] currentKey = startKey;
     do {
       RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(reload, id,
-          getConnection(), tableName, currentKey);
+          getConnection(), getTableName(), currentKey);
       HRegionLocation regionLocation = id < rl.size() ? rl.getRegionLocation(id) : null;
       if (regionLocation != null && regionLocation.getRegionInfo().containsRow(currentKey)) {
         regionList.add(regionLocation);
@@ -176,7 +176,7 @@ public class ReversedScannerCallable extends ScannerCallable {
 
   @Override
   public ScannerCallable getScannerCallableForReplica(int id) {
-    ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), this.tableName,
+    ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), getTableName(),
         this.getScan(), this.scanMetrics, this.locateStartRow, rpcControllerFactory, id);
     r.setCaching(this.getCaching());
     return r;

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
index 68a4aa2..75fec63 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.client;
 import java.io.Closeable;
 import java.io.IOException;
 
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 
 /**
  * A RetryingCallable for RPC connection operations.

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
index cc2f159..e940143 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.ipc.RemoteException;
 
-import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
 
 /**
  * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 3d55136..04553d2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -20,31 +20,27 @@ package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 
@@ -90,21 +86,14 @@ public class RpcRetryingCallerWithReadReplicas {
    * - we need to stop retrying when the call is completed
    * - we can be interrupted
    */
-  class ReplicaRegionServerCallable extends RegionServerCallable<Result> implements Cancellable {
+  class ReplicaRegionServerCallable extends CancellableRegionServerCallable<Result> {
     final int id;
-    private final HBaseRpcController controller;
-
     public ReplicaRegionServerCallable(int id, HRegionLocation location) {
-      super(RpcRetryingCallerWithReadReplicas.this.cConnection, rpcControllerFactory,
-          RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
+      super(RpcRetryingCallerWithReadReplicas.this.cConnection,
+          RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(),
+          rpcControllerFactory.newController());
       this.id = id;
       this.location = location;
-      this.controller = rpcControllerFactory.newController();
-    }
-
-    @Override
-    public void cancel() {
-      controller.startCancel();
     }
 
     /**
@@ -113,13 +102,12 @@ public class RpcRetryingCallerWithReadReplicas {
      * - set the location to the right region, depending on the replica.
      */
     @Override
+    // TODO: Very like the super class implemenation. Can we shrink this down?
     public void prepare(final boolean reload) throws IOException {
-      if (controller.isCanceled()) return;
-
+      if (getRpcController().isCanceled()) return;
       if (Thread.interrupted()) {
         throw new InterruptedIOException();
       }
-
       if (reload || location == null) {
         RegionLocations rl = getRegionLocations(false, id, cConnection, tableName, get.getRow());
         location = id < rl.size() ? rl.getRegionLocation(id) : null;
@@ -131,35 +119,27 @@ public class RpcRetryingCallerWithReadReplicas {
         throw new HBaseIOException("There is no location for replica id #" + id);
       }
 
-      ServerName dest = location.getServerName();
-
-      setStub(cConnection.getClient(dest));
+      setStubByServiceName(this.location.getServerName());
     }
 
-    private void initRpcController() {
-      controller.reset();
-      controller.setCallTimeout(callTimeout);
-      controller.setPriority(tableName);
-    }
     @Override
+    // TODO: Very like the super class implemenation. Can we shrink this down?
     protected Result rpcCall() throws Exception {
-      if (controller.isCanceled()) return null;
+      if (getRpcController().isCanceled()) return null;
       if (Thread.interrupted()) {
         throw new InterruptedIOException();
       }
       byte[] reg = location.getRegionInfo().getRegionName();
       ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get);
-      initRpcController();
-      ClientProtos.GetResponse response = getStub().get(controller, request);
+      HBaseRpcController hrc = (HBaseRpcController)getRpcController();
+      hrc.reset();
+      hrc.setCallTimeout(callTimeout);
+      hrc.setPriority(tableName);
+      ClientProtos.GetResponse response = getStub().get(hrc, request);
       if (response == null) {
         return null;
       }
-      return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
-    }
-
-    @Override
-    public boolean isCancelled() {
-      return controller.isCanceled();
+      return ProtobufUtil.toResult(response.getResult(), hrc.cellScanner());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 22f611a..71a31db 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -38,9 +38,9 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
 import org.apache.hadoop.hbase.io.TimeRange;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.security.access.Permission;
 import org.apache.hadoop.hbase.security.visibility.Authorizations;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 8345aa1..0351e54 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -42,11 +42,11 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.DNS;
@@ -57,7 +57,7 @@ import org.apache.hadoop.net.DNS;
  * {@link RpcRetryingCaller} so fails are retried.
  */
 @InterfaceAudience.Private
-public class ScannerCallable extends RegionServerCallable<Result[]> {
+public class ScannerCallable extends ClientServiceCallable<Result[]> {
   public static final String LOG_SCANNER_LATENCY_CUTOFF
     = "hbase.client.log.scanner.latency.cutoff";
   public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
@@ -119,7 +119,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
    */
   public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
-    super(connection, rpcControllerFactory, tableName, scan.getStartRow());
+    super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController());
     this.id = id;
     this.scan = scan;
     this.scanMetrics = scanMetrics;
@@ -429,7 +429,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
   }
 
   public ScannerCallable getScannerCallableForReplica(int id) {
-    ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName,
+    ScannerCallable s = new ScannerCallable(this.getConnection(), getTableName(),
         this.getScan(), this.scanMetrics, this.rpcControllerFactory, id);
     s.setCaching(this.caching);
     return s;
@@ -460,4 +460,4 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
   protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
     this.serverHasMoreResultsContext = serverHasMoreResultsContext;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
index 5af8034..c8d9738 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
@@ -26,17 +26,17 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import org.apache.hadoop.security.token.Token;
 
 /**
@@ -54,8 +54,9 @@ public class SecureBulkLoadClient {
 
   public String prepareBulkLoad(final Connection conn) throws IOException {
     try {
-      RegionServerCallable<String> callable = new RegionServerCallable<String>(conn,
-          this.rpcControllerFactory, table.getName(), HConstants.EMPTY_START_ROW) {
+      ClientServiceCallable<String> callable = new ClientServiceCallable<String>(conn,
+          table.getName(), HConstants.EMPTY_START_ROW,
+          this.rpcControllerFactory.newController()) {
         @Override
         protected String rpcCall() throws Exception {
           byte[] regionName = getLocation().getRegionInfo().getRegionName();
@@ -77,8 +78,8 @@ public class SecureBulkLoadClient {
 
   public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException {
     try {
-      RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn,
-          this.rpcControllerFactory, table.getName(), HConstants.EMPTY_START_ROW) {
+      ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
+          table.getName(), HConstants.EMPTY_START_ROW, this.rpcControllerFactory.newController()) {
         @Override
         protected Void rpcCall() throws Exception {
           byte[] regionName = getLocation().getRegionInfo().getRegionName();
@@ -145,4 +146,4 @@ public class SecureBulkLoadClient {
       throw ProtobufUtil.handleRemoteException(se);
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SyncCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SyncCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SyncCoprocessorRpcChannel.java
new file mode 100644
index 0000000..fa4e5f1
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SyncCoprocessorRpcChannel.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+
+/**
+ * Base class which provides clients with an RPC connection to
+ * call coprocessor endpoint {@link com.google.protobuf.Service}s.
+ * Note that clients should not use this class directly, except through
+ * {@link org.apache.hadoop.hbase.client.Table#coprocessorService(byte[])}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+abstract class SyncCoprocessorRpcChannel implements CoprocessorRpcChannel {
+  private static final Log LOG = LogFactory.getLog(SyncCoprocessorRpcChannel.class);
+
+  @Override
+  @InterfaceAudience.Private
+  public void callMethod(Descriptors.MethodDescriptor method,
+                         RpcController controller,
+                         Message request, Message responsePrototype,
+                         RpcCallback<Message> callback) {
+    Message response = null;
+    try {
+      response = callExecService(controller, method, request, responsePrototype);
+    } catch (IOException ioe) {
+      LOG.warn("Call failed on IOException", ioe);
+      CoprocessorRpcUtils.setControllerException(controller, ioe);
+    }
+    if (callback != null) {
+      callback.run(response);
+    }
+  }
+
+  @Override
+  @InterfaceAudience.Private
+  public Message callBlockingMethod(Descriptors.MethodDescriptor method,
+                                    RpcController controller,
+                                    Message request, Message responsePrototype)
+      throws ServiceException {
+    try {
+      return callExecService(controller, method, request, responsePrototype);
+    } catch (IOException ioe) {
+      throw new ServiceException("Error calling method "+method.getFullName(), ioe);
+    }
+  }
+
+  protected abstract Message callExecService(RpcController controller,
+      Descriptors.MethodDescriptor method, Message request, Message responsePrototype)
+          throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java
index 5d4ac8e..71875a5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java
@@ -17,12 +17,12 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 
 /**
  * Represents table state.

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
deleted file mode 100644
index 594a459..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
+++ /dev/null
@@ -1,823 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.client.coprocessor;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
-import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
-import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
-import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
-import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
-
-/**
- * This client class is for invoking the aggregate functions deployed on the
- * Region Server side via the AggregateService. This class will implement the
- * supporting functionality for summing/processing the individual results
- * obtained from the AggregateService for each region.
- * <p>
- * This will serve as the client side handler for invoking the aggregate
- * functions.
- * For all aggregate functions,
- * <ul>
- * <li>start row &lt; end row is an essential condition (if they are not
- * {@link HConstants#EMPTY_BYTE_ARRAY})
- * <li>Column family can't be null. In case where multiple families are
- * provided, an IOException will be thrown. An optional column qualifier can
- * also be defined.</li>
- * <li>For methods to find maximum, minimum, sum, rowcount, it returns the
- * parameter type. For average and std, it returns a double value. For row
- * count, it returns a long value.</li>
- * </ul>
- * <p>Call {@link #close()} when done.
- */
-@InterfaceAudience.Private
-public class AggregationClient implements Closeable {
-  // TODO: This class is not used.  Move to examples?
-  private static final Log log = LogFactory.getLog(AggregationClient.class);
-  private final Connection connection;
-
-  /**
-   * Constructor with Conf object
-   * @param cfg
-   */
-  public AggregationClient(Configuration cfg) {
-    try {
-      // Create a connection on construction. Will use it making each of the calls below.
-      this.connection = ConnectionFactory.createConnection(cfg);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (this.connection != null && !this.connection.isClosed()) {
-      this.connection.close();
-    }
-  }
-
-  /**
-   * It gives the maximum value of a column for a given column family for the
-   * given range. In case qualifier is null, a max of all values for the given
-   * family is returned.
-   * @param tableName
-   * @param ci
-   * @param scan
-   * @return max val &lt;R&gt;
-   * @throws Throwable
-   *           The caller is supposed to handle the exception as they are thrown
-   *           &amp; propagated to it.
-   */
-  public <R, S, P extends Message, Q extends Message, T extends Message> R max(
-      final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
-  throws Throwable {
-    try (Table table = connection.getTable(tableName)) {
-      return max(table, ci, scan);
-    }
-  }
-
-  /**
-   * It gives the maximum value of a column for a given column family for the
-   * given range. In case qualifier is null, a max of all values for the given
-   * family is returned.
-   * @param table
-   * @param ci
-   * @param scan
-   * @return max val &lt;&gt;
-   * @throws Throwable
-   *           The caller is supposed to handle the exception as they are thrown
-   *           &amp; propagated to it.
-   */
-  public <R, S, P extends Message, Q extends Message, T extends Message> 
-  R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
-      final Scan scan) throws Throwable {
-    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
-    class MaxCallBack implements Batch.Callback<R> {
-      R max = null;
-
-      R getMax() {
-        return max;
-      }
-
-      @Override
-      public synchronized void update(byte[] region, byte[] row, R result) {
-        max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max;
-      }
-    }
-    MaxCallBack aMaxCallBack = new MaxCallBack();
-    table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
-        new Batch.Call<AggregateService, R>() {
-          @Override
-          public R call(AggregateService instance) throws IOException {
-            ServerRpcController controller = new ServerRpcController();
-            BlockingRpcCallback<AggregateResponse> rpcCallback = 
-                new BlockingRpcCallback<AggregateResponse>();
-            instance.getMax(controller, requestArg, rpcCallback);
-            AggregateResponse response = rpcCallback.get();
-            if (controller.failedOnException()) {
-              throw controller.getFailedOn();
-            }
-            if (response.getFirstPartCount() > 0) {
-              ByteString b = response.getFirstPart(0);
-              Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
-              return ci.getCellValueFromProto(q);
-            }
-            return null;
-          }
-        }, aMaxCallBack);
-    return aMaxCallBack.getMax();
-  }
-
-  /*
-   * @param scan
-   * @param canFamilyBeAbsent whether column family can be absent in familyMap of scan
-   */
-  private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException {
-    if (scan == null
-        || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes.equals(
-          scan.getStartRow(), HConstants.EMPTY_START_ROW))
-        || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) && !Bytes.equals(
-          scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
-      throw new IOException("Agg client Exception: Startrow should be smaller than Stoprow");
-    } else if (!canFamilyBeAbsent) {
-      if (scan.getFamilyMap().size() != 1) {
-        throw new IOException("There must be only one family.");
-      }
-    }
-  }
-
-  /**
-   * It gives the minimum value of a column for a given column family for the
-   * given range. In case qualifier is null, a min of all values for the given
-   * family is returned.
-   * @param tableName
-   * @param ci
-   * @param scan
-   * @return min val &lt;R&gt;
-   * @throws Throwable
-   */
-  public <R, S, P extends Message, Q extends Message, T extends Message> R min(
-      final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
-  throws Throwable {
-    try (Table table = connection.getTable(tableName)) {
-      return min(table, ci, scan);
-    }
-  }
-
-  /**
-   * It gives the minimum value of a column for a given column family for the
-   * given range. In case qualifier is null, a min of all values for the given
-   * family is returned.
-   * @param table
-   * @param ci
-   * @param scan
-   * @return min val &lt;R&gt;
-   * @throws Throwable
-   */
-  public <R, S, P extends Message, Q extends Message, T extends Message> 
-  R min(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
-      final Scan scan) throws Throwable {
-    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
-    class MinCallBack implements Batch.Callback<R> {
-
-      private R min = null;
-
-      public R getMinimum() {
-        return min;
-      }
-
-      @Override
-      public synchronized void update(byte[] region, byte[] row, R result) {
-        min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min;
-      }
-    }
-    MinCallBack minCallBack = new MinCallBack();
-    table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
-        new Batch.Call<AggregateService, R>() {
-
-          @Override
-          public R call(AggregateService instance) throws IOException {
-            ServerRpcController controller = new ServerRpcController();
-            BlockingRpcCallback<AggregateResponse> rpcCallback = 
-                new BlockingRpcCallback<AggregateResponse>();
-            instance.getMin(controller, requestArg, rpcCallback);
-            AggregateResponse response = rpcCallback.get();
-            if (controller.failedOnException()) {
-              throw controller.getFailedOn();
-            }
-            if (response.getFirstPartCount() > 0) {
-              ByteString b = response.getFirstPart(0);
-              Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
-              return ci.getCellValueFromProto(q);
-            }
-            return null;
-          }
-        }, minCallBack);
-    log.debug("Min fom all regions is: " + minCallBack.getMinimum());
-    return minCallBack.getMinimum();
-  }
-
-  /**
-   * It gives the row count, by summing up the individual results obtained from
-   * regions. In case the qualifier is null, FirstKeyValueFilter is used to
-   * optimised the operation. In case qualifier is provided, I can't use the
-   * filter as it may set the flag to skip to next row, but the value read is
-   * not of the given filter: in this case, this particular row will not be
-   * counted ==&gt; an error.
-   * @param tableName
-   * @param ci
-   * @param scan
-   * @return &lt;R, S&gt;
-   * @throws Throwable
-   */
-  public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(
-      final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
-  throws Throwable {
-    try (Table table = connection.getTable(tableName)) {
-        return rowCount(table, ci, scan);
-    }
-  }
-
-  /**
-   * It gives the row count, by summing up the individual results obtained from
-   * regions. In case the qualifier is null, FirstKeyValueFilter is used to
-   * optimised the operation. In case qualifier is provided, I can't use the
-   * filter as it may set the flag to skip to next row, but the value read is
-   * not of the given filter: in this case, this particular row will not be
-   * counted ==&gt; an error.
-   * @param table
-   * @param ci
-   * @param scan
-   * @return &lt;R, S&gt;
-   * @throws Throwable
-   */
-  public <R, S, P extends Message, Q extends Message, T extends Message> 
-  long rowCount(final Table table,
-      final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
-    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true);
-    class RowNumCallback implements Batch.Callback<Long> {
-      private final AtomicLong rowCountL = new AtomicLong(0);
-
-      public long getRowNumCount() {
-        return rowCountL.get();
-      }
-
-      @Override
-      public void update(byte[] region, byte[] row, Long result) {
-        rowCountL.addAndGet(result.longValue());
-      }
-    }
-    RowNumCallback rowNum = new RowNumCallback();
-    table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
-        new Batch.Call<AggregateService, Long>() {
-          @Override
-          public Long call(AggregateService instance) throws IOException {
-            ServerRpcController controller = new ServerRpcController();
-            BlockingRpcCallback<AggregateResponse> rpcCallback = 
-                new BlockingRpcCallback<AggregateResponse>();
-            instance.getRowNum(controller, requestArg, rpcCallback);
-            AggregateResponse response = rpcCallback.get();
-            if (controller.failedOnException()) {
-              throw controller.getFailedOn();
-            }
-            byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
-            ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
-            bb.rewind();
-            return bb.getLong();
-          }
-        }, rowNum);
-    return rowNum.getRowNumCount();
-  }
-
-  /**
-   * It sums up the value returned from various regions. In case qualifier is
-   * null, summation of all the column qualifiers in the given family is done.
-   * @param tableName
-   * @param ci
-   * @param scan
-   * @return sum &lt;S&gt;
-   * @throws Throwable
-   */
-  public <R, S, P extends Message, Q extends Message, T extends Message> S sum(
-      final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
-  throws Throwable {
-    try (Table table = connection.getTable(tableName)) {
-        return sum(table, ci, scan);
-    }
-  }
-
-  /**
-   * It sums up the value returned from various regions. In case qualifier is
-   * null, summation of all the column qualifiers in the given family is done.
-   * @param table
-   * @param ci
-   * @param scan
-   * @return sum &lt;S&gt;
-   * @throws Throwable
-   */
-  public <R, S, P extends Message, Q extends Message, T extends Message> 
-  S sum(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
-      final Scan scan) throws Throwable {
-    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
-    
-    class SumCallBack implements Batch.Callback<S> {
-      S sumVal = null;
-
-      public S getSumResult() {
-        return sumVal;
-      }
-
-      @Override
-      public synchronized void update(byte[] region, byte[] row, S result) {
-        sumVal = ci.add(sumVal, result);
-      }
-    }
-    SumCallBack sumCallBack = new SumCallBack();
-    table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
-        new Batch.Call<AggregateService, S>() {
-          @Override
-          public S call(AggregateService instance) throws IOException {
-            ServerRpcController controller = new ServerRpcController();
-            BlockingRpcCallback<AggregateResponse> rpcCallback = 
-                new BlockingRpcCallback<AggregateResponse>();
-            instance.getSum(controller, requestArg, rpcCallback);
-            AggregateResponse response = rpcCallback.get();
-            if (controller.failedOnException()) {
-              throw controller.getFailedOn();
-            }
-            if (response.getFirstPartCount() == 0) {
-              return null;
-            }
-            ByteString b = response.getFirstPart(0);
-            T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
-            S s = ci.getPromotedValueFromProto(t);
-            return s;
-          }
-        }, sumCallBack);
-    return sumCallBack.getSumResult();
-  }
-
-  /**
-   * It computes average while fetching sum and row count from all the
-   * corresponding regions. Approach is to compute a global sum of region level
-   * sum and rowcount and then compute the average.
-   * @param tableName
-   * @param scan
-   * @throws Throwable
-   */
-  private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs(
-      final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
-      throws Throwable {
-    try (Table table = connection.getTable(tableName)) {
-        return getAvgArgs(table, ci, scan);
-    }
-  }
-
-  /**
-   * It computes average while fetching sum and row count from all the
-   * corresponding regions. Approach is to compute a global sum of region level
-   * sum and rowcount and then compute the average.
-   * @param table
-   * @param scan
-   * @throws Throwable
-   */
-  private <R, S, P extends Message, Q extends Message, T extends Message>
-  Pair<S, Long> getAvgArgs(final Table table,
-      final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
-    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
-    class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
-      S sum = null;
-      Long rowCount = 0l;
-
-      public synchronized Pair<S, Long> getAvgArgs() {
-        return new Pair<S, Long>(sum, rowCount);
-      }
-
-      @Override
-      public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) {
-        sum = ci.add(sum, result.getFirst());
-        rowCount += result.getSecond();
-      }
-    }
-    AvgCallBack avgCallBack = new AvgCallBack();
-    table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
-        new Batch.Call<AggregateService, Pair<S, Long>>() {
-          @Override
-          public Pair<S, Long> call(AggregateService instance) throws IOException {
-            ServerRpcController controller = new ServerRpcController();
-            BlockingRpcCallback<AggregateResponse> rpcCallback = 
-                new BlockingRpcCallback<AggregateResponse>();
-            instance.getAvg(controller, requestArg, rpcCallback);
-            AggregateResponse response = rpcCallback.get();
-            if (controller.failedOnException()) {
-              throw controller.getFailedOn();
-            }
-            Pair<S, Long> pair = new Pair<S, Long>(null, 0L);
-            if (response.getFirstPartCount() == 0) {
-              return pair;
-            }
-            ByteString b = response.getFirstPart(0);
-            T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
-            S s = ci.getPromotedValueFromProto(t);
-            pair.setFirst(s);
-            ByteBuffer bb = ByteBuffer.allocate(8).put(
-                getBytesFromResponse(response.getSecondPart()));
-            bb.rewind();
-            pair.setSecond(bb.getLong());
-            return pair;
-          }
-        }, avgCallBack);
-    return avgCallBack.getAvgArgs();
-  }
-
-  /**
-   * This is the client side interface/handle for calling the average method for
-   * a given cf-cq combination. It was necessary to add one more call stack as
-   * its return type should be a decimal value, irrespective of what
-   * columninterpreter says. So, this methods collects the necessary parameters
-   * to compute the average and returs the double value.
-   * @param tableName
-   * @param ci
-   * @param scan
-   * @return &lt;R, S&gt;
-   * @throws Throwable
-   */
-  public <R, S, P extends Message, Q extends Message, T extends Message>
-  double avg(final TableName tableName,
-      final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
-    Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
-    return ci.divideForAvg(p.getFirst(), p.getSecond());
-  }
-
-  /**
-   * This is the client side interface/handle for calling the average method for
-   * a given cf-cq combination. It was necessary to add one more call stack as
-   * its return type should be a decimal value, irrespective of what
-   * columninterpreter says. So, this methods collects the necessary parameters
-   * to compute the average and returs the double value.
-   * @param table
-   * @param ci
-   * @param scan
-   * @return &lt;R, S&gt;
-   * @throws Throwable
-   */
-  public <R, S, P extends Message, Q extends Message, T extends Message> double avg(
-      final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
-    Pair<S, Long> p = getAvgArgs(table, ci, scan);
-    return ci.divideForAvg(p.getFirst(), p.getSecond());
-  }
-
-  /**
-   * It computes a global standard deviation for a given column and its value.
-   * Standard deviation is square root of (average of squares -
-   * average*average). From individual regions, it obtains sum, square sum and
-   * number of rows. With these, the above values are computed to get the global
-   * std.
-   * @param table
-   * @param scan
-   * @return standard deviations
-   * @throws Throwable
-   */
-  private <R, S, P extends Message, Q extends Message, T extends Message>
-  Pair<List<S>, Long> getStdArgs(final Table table,
-      final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
-    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
-    class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
-      long rowCountVal = 0l;
-      S sumVal = null, sumSqVal = null;
-
-      public synchronized Pair<List<S>, Long> getStdParams() {
-        List<S> l = new ArrayList<S>();
-        l.add(sumVal);
-        l.add(sumSqVal);
-        Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
-        return p;
-      }
-
-      @Override
-      public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
-        if (result.getFirst().size() > 0) {
-          sumVal = ci.add(sumVal, result.getFirst().get(0));
-          sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
-          rowCountVal += result.getSecond();
-        }
-      }
-    }
-    StdCallback stdCallback = new StdCallback();
-    table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
-        new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
-          @Override
-          public Pair<List<S>, Long> call(AggregateService instance) throws IOException {
-            ServerRpcController controller = new ServerRpcController();
-            BlockingRpcCallback<AggregateResponse> rpcCallback = 
-                new BlockingRpcCallback<AggregateResponse>();
-            instance.getStd(controller, requestArg, rpcCallback);
-            AggregateResponse response = rpcCallback.get();
-            if (controller.failedOnException()) {
-              throw controller.getFailedOn();
-            }
-            Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L);
-            if (response.getFirstPartCount() == 0) {
-              return pair;
-            }
-            List<S> list = new ArrayList<S>();
-            for (int i = 0; i < response.getFirstPartCount(); i++) {
-              ByteString b = response.getFirstPart(i);
-              T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
-              S s = ci.getPromotedValueFromProto(t);
-              list.add(s);
-            }
-            pair.setFirst(list);
-            ByteBuffer bb = ByteBuffer.allocate(8).put(
-                getBytesFromResponse(response.getSecondPart()));
-            bb.rewind();
-            pair.setSecond(bb.getLong());
-            return pair;
-          }
-        }, stdCallback);
-    return stdCallback.getStdParams();
-  }
-
-  /**
-   * This is the client side interface/handle for calling the std method for a
-   * given cf-cq combination. It was necessary to add one more call stack as its
-   * return type should be a decimal value, irrespective of what
-   * columninterpreter says. So, this methods collects the necessary parameters
-   * to compute the std and returns the double value.
-   * @param tableName
-   * @param ci
-   * @param scan
-   * @return &lt;R, S&gt;
-   * @throws Throwable
-   */
-  public <R, S, P extends Message, Q extends Message, T extends Message>
-  double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
-      Scan scan) throws Throwable {
-    try (Table table = connection.getTable(tableName)) {
-        return std(table, ci, scan);
-    }
-  }
-
-  /**
-   * This is the client side interface/handle for calling the std method for a
-   * given cf-cq combination. It was necessary to add one more call stack as its
-   * return type should be a decimal value, irrespective of what
-   * columninterpreter says. So, this methods collects the necessary parameters
-   * to compute the std and returns the double value.
-   * @param table
-   * @param ci
-   * @param scan
-   * @return &lt;R, S&gt;
-   * @throws Throwable
-   */
-  public <R, S, P extends Message, Q extends Message, T extends Message> double std(
-      final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
-    Pair<List<S>, Long> p = getStdArgs(table, ci, scan);
-    double res = 0d;
-    double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
-    double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
-    res = avgOfSumSq - (avg) * (avg); // variance
-    res = Math.pow(res, 0.5);
-    return res;
-  }
-
-  /**
-   * It helps locate the region with median for a given column whose weight 
-   * is specified in an optional column.
-   * From individual regions, it obtains sum of values and sum of weights.
-   * @param table
-   * @param ci
-   * @param scan
-   * @return pair whose first element is a map between start row of the region
-   *  and (sum of values, sum of weights) for the region, the second element is
-   *  (sum of values, sum of weights) for all the regions chosen
-   * @throws Throwable
-   */
-  private <R, S, P extends Message, Q extends Message, T extends Message>
-  Pair<NavigableMap<byte[], List<S>>, List<S>>
-  getMedianArgs(final Table table,
-      final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
-    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
-    final NavigableMap<byte[], List<S>> map =
-      new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
-    class StdCallback implements Batch.Callback<List<S>> {
-      S sumVal = null, sumWeights = null;
-
-      public synchronized Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() {
-        List<S> l = new ArrayList<S>();
-        l.add(sumVal);
-        l.add(sumWeights);
-        Pair<NavigableMap<byte[], List<S>>, List<S>> p =
-          new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l);
-        return p;
-      }
-
-      @Override
-      public synchronized void update(byte[] region, byte[] row, List<S> result) {
-        map.put(row, result);
-        sumVal = ci.add(sumVal, result.get(0));
-        sumWeights = ci.add(sumWeights, result.get(1));
-      }
-    }
-    StdCallback stdCallback = new StdCallback();
-    table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
-        new Batch.Call<AggregateService, List<S>>() {
-          @Override
-          public List<S> call(AggregateService instance) throws IOException {
-            ServerRpcController controller = new ServerRpcController();
-            BlockingRpcCallback<AggregateResponse> rpcCallback = 
-                new BlockingRpcCallback<AggregateResponse>();
-            instance.getMedian(controller, requestArg, rpcCallback);
-            AggregateResponse response = rpcCallback.get();
-            if (controller.failedOnException()) {
-              throw controller.getFailedOn();
-            }
-
-            List<S> list = new ArrayList<S>();
-            for (int i = 0; i < response.getFirstPartCount(); i++) {
-              ByteString b = response.getFirstPart(i);
-              T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
-              S s = ci.getPromotedValueFromProto(t);
-              list.add(s);
-            }
-            return list;
-          }
-
-        }, stdCallback);
-    return stdCallback.getMedianParams();
-  }
-
-  /**
-   * This is the client side interface/handler for calling the median method for a
-   * given cf-cq combination. This method collects the necessary parameters
-   * to compute the median and returns the median.
-   * @param tableName
-   * @param ci
-   * @param scan
-   * @return R the median
-   * @throws Throwable
-   */
-  public <R, S, P extends Message, Q extends Message, T extends Message>
-  R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
-      Scan scan) throws Throwable {
-    try (Table table = connection.getTable(tableName)) {
-        return median(table, ci, scan);
-    }
-  }
-
-  /**
-   * This is the client side interface/handler for calling the median method for a
-   * given cf-cq combination. This method collects the necessary parameters
-   * to compute the median and returns the median.
-   * @param table
-   * @param ci
-   * @param scan
-   * @return R the median
-   * @throws Throwable
-   */
-  public <R, S, P extends Message, Q extends Message, T extends Message>
-  R median(final Table table, ColumnInterpreter<R, S, P, Q, T> ci,
-      Scan scan) throws Throwable {
-    Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan);
-    byte[] startRow = null;
-    byte[] colFamily = scan.getFamilies()[0];
-    NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
-    NavigableMap<byte[], List<S>> map = p.getFirst();
-    S sumVal = p.getSecond().get(0);
-    S sumWeights = p.getSecond().get(1);
-    double halfSumVal = ci.divideForAvg(sumVal, 2L);
-    double movingSumVal = 0;
-    boolean weighted = false;
-    if (quals.size() > 1) {
-      weighted = true;
-      halfSumVal = ci.divideForAvg(sumWeights, 2L);
-    }
-    
-    for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
-      S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
-      double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
-      if (newSumVal > halfSumVal) break;  // we found the region with the median
-      movingSumVal = newSumVal;
-      startRow = entry.getKey();
-    }
-    // scan the region with median and find it
-    Scan scan2 = new Scan(scan);
-    // inherit stop row from method parameter
-    if (startRow != null) scan2.setStartRow(startRow);
-    ResultScanner scanner = null;
-    try {
-      int cacheSize = scan2.getCaching();
-      if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
-        scan2.setCacheBlocks(true);
-        cacheSize = 5;
-        scan2.setCaching(cacheSize);
-      }
-      scanner = table.getScanner(scan2);
-      Result[] results = null;
-      byte[] qualifier = quals.pollFirst();
-      // qualifier for the weight column
-      byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
-      R value = null;
-      do {
-        results = scanner.next(cacheSize);
-        if (results != null && results.length > 0) {
-          for (int i = 0; i < results.length; i++) {
-            Result r = results[i];
-            // retrieve weight
-            Cell kv = r.getColumnLatestCell(colFamily, weightQualifier);
-            R newValue = ci.getValue(colFamily, weightQualifier, kv);
-            S s = ci.castToReturnType(newValue);
-            double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
-            // see if we have moved past the median
-            if (newSumVal > halfSumVal) {
-              return value;
-            }
-            movingSumVal = newSumVal;
-            kv = r.getColumnLatestCell(colFamily, qualifier);
-            value = ci.getValue(colFamily, qualifier, kv);
-            }
-          }
-      } while (results != null && results.length > 0);
-    } finally {
-      if (scanner != null) {
-        scanner.close();
-      }
-    }
-    return null;
-  }
-
-  <R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest 
-  validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean canFamilyBeAbsent)
-      throws IOException {
-    validateParameters(scan, canFamilyBeAbsent);
-    final AggregateRequest.Builder requestBuilder = 
-        AggregateRequest.newBuilder();
-    requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
-    P columnInterpreterSpecificData = null;
-    if ((columnInterpreterSpecificData = ci.getRequestData()) 
-       != null) {
-      requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
-    }
-    requestBuilder.setScan(ProtobufUtil.toScan(scan));
-    return requestBuilder.build();
-  }
-
-  byte[] getBytesFromResponse(ByteString response) {
-    ByteBuffer bb = response.asReadOnlyByteBuffer();
-    bb.rewind();
-    byte[] bytes;
-    if (bb.hasArray()) {
-      bytes = bb.array();
-    } else {
-      bytes = response.toByteArray();
-    }
-    return bytes;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/BigDecimalColumnInterpreter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/BigDecimalColumnInterpreter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/BigDecimalColumnInterpreter.java
index 5d1cc91..7d08b7e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/BigDecimalColumnInterpreter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/BigDecimalColumnInterpreter.java
@@ -30,9 +30,10 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BigDecimalMsg;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg;
-import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 
+import com.google.protobuf.ByteString;
+
 /**
  * ColumnInterpreter for doing Aggregation's with BigDecimal columns. This class
  * is required at the RegionServer also.
@@ -123,9 +124,9 @@ public class BigDecimalColumnInterpreter extends ColumnInterpreter<BigDecimal, B
 
   private BigDecimalMsg getProtoForType(BigDecimal t) {
     BigDecimalMsg.Builder builder = BigDecimalMsg.newBuilder();
-    return builder.setBigdecimalMsg(ByteStringer.wrap(Bytes.toBytes(t))).build();
+    return builder.setBigdecimalMsg(ByteString.copyFrom(Bytes.toBytes(t))).build();
   }
-  
+
   @Override
   public BigDecimalMsg getProtoForCellType(BigDecimal t) {
     return getProtoForType(t);

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
index 225e685..2590021 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
@@ -18,18 +18,17 @@
  */
 package org.apache.hadoop.hbase.client.replication;
 
-import com.google.protobuf.ByteString;
-
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Strings;
@@ -279,7 +278,6 @@ public final class ReplicationSerDeHelper {
     if (tableCFsMap != null) {
       peerConfig.setTableCFsMap(tableCFsMap);
     }
-
     List<ByteString> namespacesList = peer.getNamespacesList();
     if (namespacesList != null && namespacesList.size() != 0) {
       Set<String> namespaces = new HashSet<String>();

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryComparator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryComparator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryComparator.java
index 3cbb7b9..b59398b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryComparator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryComparator.java
@@ -24,11 +24,12 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ComparatorProtos;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * A binary comparator which lexicographically compares against the specified
@@ -36,8 +37,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class BinaryComparator extends ByteArrayComparable {
-
+public class BinaryComparator extends org.apache.hadoop.hbase.filter.ByteArrayComparable {
   /**
    * Constructor
    * @param value value
@@ -62,7 +62,7 @@ public class BinaryComparator extends ByteArrayComparable {
   public byte [] toByteArray() {
     ComparatorProtos.BinaryComparator.Builder builder =
       ComparatorProtos.BinaryComparator.newBuilder();
-    builder.setComparable(super.convert());
+    builder.setComparable(ProtobufUtil.toByteArrayComparable(this.value));
     return builder.build().toByteArray();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryPrefixComparator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryPrefixComparator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryPrefixComparator.java
index a26edbc..01cb769 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryPrefixComparator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryPrefixComparator.java
@@ -24,11 +24,12 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ComparatorProtos;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * A comparator which compares against a specified byte array, but only compares
@@ -67,7 +68,7 @@ public class BinaryPrefixComparator extends ByteArrayComparable {
   public byte [] toByteArray() {
     ComparatorProtos.BinaryPrefixComparator.Builder builder =
       ComparatorProtos.BinaryPrefixComparator.newBuilder();
-    builder.setComparable(super.convert());
+    builder.setComparable(ProtobufUtil.toByteArrayComparable(this.value));
     return builder.build().toByteArray();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BitComparator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BitComparator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BitComparator.java
index db51df7..dac8864 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BitComparator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BitComparator.java
@@ -24,9 +24,10 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ComparatorProtos;
 
-import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * A bit comparator which performs the specified bitwise operation on each of the bytes
@@ -72,7 +73,7 @@ public class BitComparator extends ByteArrayComparable {
   public byte [] toByteArray() {
     ComparatorProtos.BitComparator.Builder builder =
       ComparatorProtos.BitComparator.newBuilder();
-    builder.setComparable(super.convert());
+    builder.setComparable(ProtobufUtil.toByteArrayComparable(this.value));
     ComparatorProtos.BitComparator.BitwiseOp bitwiseOpPb =
       ComparatorProtos.BitComparator.BitwiseOp.valueOf(bitOperator.name());
     builder.setBitwiseOp(bitwiseOpPb);

http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java
index c747b00..3ae20a1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java
@@ -26,10 +26,10 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.FilterProtos;
 
 import com.google.common.base.Preconditions;
-import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * Simple filter that returns first N columns on row only.


Mime
View raw message