hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject [32/50] [abbrv] git commit: [HBASE-11416] Add size aware ThriftHRegionServer
Date Thu, 31 Jul 2014 22:08:08 GMT
[HBASE-11416] Add size aware ThriftHRegionServer

Summary:
  - Add Async ThriftHRegionServer.
  - Added read and write call queues so that writes will be seperated from reads.
  - Added size aware ThriftHRegionServer that will throw errors if the call queue gets too large.

Test Plan: Test on Dev cluster soon.

Reviewers: rshroff, manukranthk

Reviewed By: rshroff

Subscribers: abbyedwards, hbase-prodeng@, hbase-eng@

Differential Revision: https://phabricator.fb.com/D1404826

Tasks: 4589584

git-svn-id: svn+ssh://tubbs/svnhive/hadoop/branches/titan/VENDOR.hbase/hbase-trunk@43186 e7acf4d4-3532-417f-9e73-7a9ae25a1f51


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6b30de6e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6b30de6e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6b30de6e

Branch: refs/heads/0.89-fb
Commit: 6b30de6e381c805315f5bc9a61550bca41ae6ade
Parents: 5ff5311
Author: elliott <elliott@e7acf4d4-3532-417f-9e73-7a9ae25a1f51>
Authored: Thu Jun 26 22:42:11 2014 +0000
Committer: Elliott Clark <elliott@fb.com>
Committed: Thu Jul 31 14:44:23 2014 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/HConstants.java     |   2 +-
 .../hbase/regionserver/HRegionServer.java       |   2 +-
 .../SizeAwareThriftHRegionServer.java           | 148 +++
 .../hbase/regionserver/ThriftHRegionServer.java | 993 ++++++++++++-------
 .../apache/hadoop/hbase/MiniHBaseCluster.java   |   5 +-
 .../FailureInjectingThriftHRegionServer.java    |  12 +-
 .../TestHRegionInterfaceSimpleFunctions.java    |  34 +-
 7 files changed, 798 insertions(+), 398 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6b30de6e/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/HConstants.java b/src/main/java/org/apache/hadoop/hbase/HConstants.java
index ddaabce..3970bc2 100644
--- a/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -833,7 +833,7 @@ public final class HConstants {
    * Number of threads for swift server
    */
   public static final String SWIFT_WORKER_THREADS = "hbase.swift.worker.threads";
-  public static final int SWIFT_WORKER_THREADS_DEFAULT = 300;
+  public static final int SWIFT_WORKER_THREADS_DEFAULT = 10;
 
   /**
    * Number of io threads for swift server

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b30de6e/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 3f3947d..7c1c736 100755
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -638,7 +638,7 @@ public class HRegionServer implements HRegionServerIf, HBaseRPCErrorHandler,
               HConstants.DEFAULT_REGIONSERVER_SWIFT_PORT);
       Class<? extends ThriftHRegionServer> thriftServerClass =
           (Class<? extends ThriftHRegionServer>)
-              conf.getClass(HConstants.THRIFT_REGION_SERVER_IMPL, ThriftHRegionServer.class);
+              conf.getClass(HConstants.THRIFT_REGION_SERVER_IMPL, SizeAwareThriftHRegionServer.class);
 
       ThriftHRegionServer thriftServer;
       try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b30de6e/src/main/java/org/apache/hadoop/hbase/regionserver/SizeAwareThriftHRegionServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/SizeAwareThriftHRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/SizeAwareThriftHRegionServer.java
new file mode 100644
index 0000000..d2526c6
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/SizeAwareThriftHRegionServer.java
@@ -0,0 +1,148 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.MultiAction;
+import org.apache.hadoop.hbase.client.MultiPut;
+import org.apache.hadoop.hbase.client.MultiPutResponse;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.TMultiResponse;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SizeAwareThriftHRegionServer extends ThriftHRegionServer {
+
+  private final AtomicLong writeQueueSizebytes = new AtomicLong(0);
+  private final long maxWriteSize;
+
+  public SizeAwareThriftHRegionServer(HRegionServer server) {
+    super(server);
+    this.maxWriteSize = server.getConfiguration().getLong(
+        HConstants.MAX_CALL_QUEUE_MEMORY_SIZE_STRING, HConstants.MAX_CALL_QUEUE_MEMORY_SIZE);
+  }
+
+  @Override
+  public ListenableFuture<Void> put(final byte[] regionName, final Put put) {
+    if (writeQueueSizebytes.get() < maxWriteSize) {
+      final long heapSize = put.heapSize();
+      writeQueueSizebytes.addAndGet(heapSize);
+      ListenableFuture<Void> f = super.put(regionName, put);
+      Futures.addCallback(f, new DecrementWriteSizeCallback(heapSize));
+      return f;
+    } else {
+      return Futures.immediateFailedFuture(new RegionOverloadedException());
+    }
+  }
+
+  @Override
+  public ListenableFuture<Integer> putRows(final byte[] regionName, final List<Put> puts) {
+    if (writeQueueSizebytes.get() < maxWriteSize) {
+      long heapSize = 0;
+      for (Put p : puts) {
+        heapSize += p.heapSize();
+      }
+      writeQueueSizebytes.addAndGet(heapSize);
+      ListenableFuture<Integer> f = super.putRows(regionName, puts);
+      Futures.addCallback(f, new DecrementWriteSizeCallback(heapSize));
+      return f;
+    } else {
+      return Futures.immediateFailedFuture(new RegionOverloadedException());
+    }
+  }
+
+  @Override
+  public ListenableFuture<TMultiResponse> multiAction(final MultiAction multi) {
+    if (writeQueueSizebytes.get() < maxWriteSize) {
+      long heapSize = 0;
+      if (multi.getPuts() != null) {
+        for (List<Put> puts : multi.getPuts().values()) {
+          for (Put p : puts) {
+            heapSize += p.heapSize();
+          }
+        }
+      }
+      if (multi.getDeletes() != null) {
+        for (List<Delete> deletes:multi.getDeletes().values()) {
+          for (Delete d:deletes) {
+            heapSize += d.getRow().length * d.size();
+          }
+        }
+      }
+
+      writeQueueSizebytes.addAndGet(heapSize);
+      ListenableFuture<TMultiResponse> f = super.multiAction(multi);
+      Futures.addCallback(f, new DecrementWriteSizeCallback(heapSize));
+      return f;
+    } else {
+      return Futures.immediateFailedFuture(new RegionOverloadedException());
+    }
+  }
+
+  @Override
+  public ListenableFuture<MultiPutResponse> multiPut(final MultiPut puts) {
+    if (writeQueueSizebytes.get() < maxWriteSize) {
+      long heapSize = 0;
+      for (List<Put> putList : puts.getPuts().values()) {
+        for (Put p : putList) {
+          heapSize += p.heapSize();
+        }
+      }
+      writeQueueSizebytes.addAndGet(heapSize);
+      ListenableFuture<MultiPutResponse> f = super.multiPut(puts);
+      Futures.addCallback(f, new DecrementWriteSizeCallback(heapSize));
+      return f;
+    } else {
+      return Futures.immediateFailedFuture(new RegionOverloadedException());
+    }
+  }
+
+  @Override
+  public ListenableFuture<Void> processDelete(final byte[] regionName, final Delete delete)  {
+    if (writeQueueSizebytes.get() < maxWriteSize) {
+      long heapSize = delete.size() * delete.getRow().length;
+      writeQueueSizebytes.addAndGet(heapSize);
+      ListenableFuture<Void> f = super.processDelete(regionName, delete);
+      Futures.addCallback(f, new DecrementWriteSizeCallback(heapSize));
+      return f;
+    } else {
+      return Futures.immediateFailedFuture(new RegionOverloadedException());
+    }
+  }
+
+  @Override
+  public ListenableFuture<Integer> processListOfDeletes(final byte[] regionName, final List<Delete> deletes) {
+    if (writeQueueSizebytes.get() < maxWriteSize) {
+      long heapSize = 0;
+      for (Delete d:deletes) {
+        heapSize += d.size() * d.getRow().length;
+      }
+      writeQueueSizebytes.addAndGet(heapSize);
+      ListenableFuture<Integer> f = super.processListOfDeletes(regionName, deletes);
+      Futures.addCallback(f, new DecrementWriteSizeCallback(heapSize));
+      return f;
+    } else {
+      return Futures.immediateFailedFuture(new RegionOverloadedException());
+    }
+  }
+
+  private class DecrementWriteSizeCallback<V> implements FutureCallback<V> {
+    private final long heapSize;
+
+    public DecrementWriteSizeCallback(long heapSize) {
+      this.heapSize = heapSize;
+    }
+
+    @Override public void onSuccess(@Nullable V aVoid) {
+      writeQueueSizebytes.addAndGet(-1 * heapSize);
+    }
+
+    @Override public void onFailure(Throwable throwable) {
+      writeQueueSizebytes.addAndGet(-1 * heapSize);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b30de6e/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java
index eeea3df..0d6eaca 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java
@@ -27,9 +27,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -48,12 +54,14 @@ import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TMultiResponse;
 import org.apache.hadoop.hbase.client.TRowMutations;
+import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram;
 import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket;
 import org.apache.hadoop.hbase.ipc.ScannerResult;
 import org.apache.hadoop.hbase.ipc.ThriftHRegionInterface;
 import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
 import org.apache.hadoop.hbase.master.AssignmentPlan;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.DaemonThreadFactory;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -67,477 +75,639 @@ import com.google.common.util.concurrent.ListenableFuture;
  * This is just a wrapper around {@link HRegionServer}
  *
  */
-public class ThriftHRegionServer implements ThriftHRegionInterface.Sync {
+public class ThriftHRegionServer implements ThriftHRegionInterface.Async {
   private static Log LOG = LogFactory.getLog(ThriftHRegionServer.class);
 
   private HRegionServer server;
 
   public ThriftHRegionServer(HRegionServer server) {
     this.server = server;
-  }
+    final Configuration configuration = server.getConfiguration();
 
-  @Override
-  public HRegionInfo getRegionInfo(byte[] regionName)
-      throws ThriftHBaseException {
-    try {
-      HRegionInfo r = server.getRegionInfo(regionName);
-      LOG.debug("Printing the result of getClosestRowOrBefore : " + r);
-      return r;
-    } catch (NotServingRegionException e) {
-      e.printStackTrace();
-      throw new ThriftHBaseException(e);
-    }
+    // Current default assumes 80% writes to 20% reads.
+    int readHandlers = configuration.getInt("hbase.regionserver.handler.read.count",
+        configuration.getInt("hbase.regionserver.handler.count", 300) / 5 );
+    int writeHandlers = configuration.getInt("hbase.regionserver.handler.write.count",
+        configuration.getInt("hbase.regionserver.handler.count", 300) - readHandlers );
+
+    writeService = MoreExecutors.listeningDecorator(
+        Executors.newFixedThreadPool(writeHandlers,
+            new DaemonThreadFactory("HBase-Write-Handler-")));
+
+    readService = MoreExecutors.listeningDecorator(
+        Executors.newFixedThreadPool(readHandlers,
+            new DaemonThreadFactory("HBase-Read-Handler-")));
   }
 
+  private final ListeningExecutorService writeService;
+
+
+  private final ListeningExecutorService readService;
+
+
   @Override
-  public Result getClosestRowBefore(byte[] regionName, byte[] row, byte[] family)
-      throws ThriftHBaseException {
-    try {
-      Result r =  server.getClosestRowBefore(regionName, row, family);
-      if (r == null) {
-        return Result.SENTINEL_RESULT;
-      } else {
-        return addThriftRegionInfoQualifierIfNeeded(r);
+  public ListenableFuture<HRegionInfo> getRegionInfo(final byte[] regionName) {
+    return readService.submit(new Callable<HRegionInfo>() {
+      @Override public HRegionInfo call() throws Exception {
+        try {
+          HRegionInfo r = server.getRegionInfo(regionName);
+          LOG.debug("Printing the result of getClosestRowOrBefore : " + r);
+          return r;
+        } catch (NotServingRegionException e) {
+          throw new ThriftHBaseException(e);
+        }
       }
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+    });
   }
 
-  /**
-   * If there is a 'regioninfo' value in the result, it would be in Writable
-   * serialized form. Add a thrift serialized HRegionInfo object for the
-   * non-Java objects.
-   */
-  private Result addThriftRegionInfoQualifierIfNeeded(Result tentativeResult) {
-    //TODO: Thrift has some problem serializing HRegionInfo. Since this method is only
-    // for C++ client, I temporarily disable it. After the problem is fixed we should
-    // remove the flag.
-    if (HConstants.DISABLE_THRIFT_REGION_INFO_QUALIFIER) {
-      return tentativeResult;
-    }
+  @Override
+  public ListenableFuture<Result> getClosestRowBefore(final byte[] regionName, final byte[] row,
+      final byte[] family) {
+    return readService.submit(new Callable<Result>(){
 
-    // Get the serialized HRegionInfo object
-    byte[] value = tentativeResult.searchValue(HConstants.CATALOG_FAMILY,
-      HConstants.REGIONINFO_QUALIFIER);
-    // If the value exists, then we need to do something, otherwise, we return
-    // the result untouched.
-    if (value != null && value.length > 0) {
-      try {
-        // Get the Writable-serialized HRegionInfo object.
-        HRegionInfo hri = (HRegionInfo) Writables.getWritable(value,
-          new HRegionInfo());
-        byte[] thriftSerializedHri =
-          Bytes.writeThriftBytes(hri, HRegionInfo.class);
-        // Create the KV with the thrift-serialized HRegionInfo
-        List<KeyValue> kvList = tentativeResult.getKvs();
-        KeyValue kv = new KeyValue(kvList.get(0).getRow(),
-          HConstants.CATALOG_FAMILY, HConstants.THRIFT_REGIONINFO_QUALIFIER,
-          thriftSerializedHri);
-        kvList.add(kv);
-        return new Result(kvList);
-      } catch (Exception e) {
-        // If failed, log the mistake and returns the original result
-        LOG.error("Thrift Serialization of the HRegionInfo object failed!");
-        e.printStackTrace();
+      @Override public Result call() throws Exception {
+        try {
+          Result r =  server.getClosestRowBefore(regionName, row, family);
+          if (r == null) {
+            return Result.SENTINEL_RESULT;
+          } else {
+            return addThriftRegionInfoQualifierIfNeeded(r);
+          }
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
       }
-    }
-    return tentativeResult;
+    });
+
   }
 
   @Override
-  public void flushRegion(byte[] regionName)
-      throws ThriftHBaseException {
-    try {
-      server.flushRegion(regionName);
-    } catch (Exception e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<Void> flushRegion(final byte[] regionName) {
+    return writeService.submit(new Callable<Void>() {
+      @Override public Void call() throws Exception {
+        try {
+          server.flushRegion(regionName);
+        } catch (Exception e) {
+          throw new ThriftHBaseException(e);
+        }
+        return null;
+      }
+    });
   }
 
   @Override
-  public void flushRegion(byte[] regionName, long ifOlderThanTS)
-      throws ThriftHBaseException {
-    try {
-      server.flushRegion(regionName, ifOlderThanTS);
-    } catch (IllegalArgumentException e) {
-      throw new ThriftHBaseException(e);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<Void> flushRegion(final byte[] regionName, final long ifOlderThanTS) {
+    return writeService.submit(new Callable<Void>() {
+      @Override public Void call() throws Exception {
+        try {
+          server.flushRegion(regionName, ifOlderThanTS);
+        } catch (IllegalArgumentException e) {
+          throw new ThriftHBaseException(e);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+        return null;
+      }
+    });
   }
 
   @Override
-  public long getLastFlushTime(byte[] regionName) {
-    return server.getLastFlushTime(regionName);
+  public ListenableFuture<Long> getLastFlushTime(final byte[] regionName) {
+    return readService.submit(new Callable<Long>() {
+      @Override public Long call() throws Exception {
+        return server.getLastFlushTime(regionName);
+      }
+    });
   }
 
   @Override
-  public Map<byte[], Long> getLastFlushTimes() {
-    MapWritable mapWritable = server.getLastFlushTimes();
-    Map<byte[], Long> map = new HashMap<byte[], Long>();
+  public ListenableFuture<Map<byte[], Long>> getLastFlushTimes() {
+    return readService.submit(new Callable<Map<byte[], Long>>() {
+      @Override public Map<byte[], Long> call() throws Exception {
+        MapWritable mapWritable = server.getLastFlushTimes();
+        Map<byte[], Long> map = new HashMap<>();
 
-    for (Entry<Writable, Writable> e : mapWritable.entrySet()) {
-      map.put(((BytesWritable) e.getKey()).getBytes(),
-          ((LongWritable) e.getValue()).get());
-    }
-    return map;
+        for (Entry<Writable, Writable> e : mapWritable.entrySet()) {
+          map.put(((BytesWritable) e.getKey()).getBytes(),
+              ((LongWritable) e.getValue()).get());
+        }
+        return map;
+      }
+    });
   }
 
   @Override
-  public long getCurrentTimeMillis() {
-    return server.getCurrentTimeMillis();
+  public ListenableFuture<Long> getCurrentTimeMillis() {
+    return readService.submit(new Callable<Long>() {
+      @Override public Long call() throws Exception {
+        return server.getCurrentTimeMillis();
+      }
+    });
   }
 
   @Override
-  public long getStartCode() {
-    return server.getStartCode();
+  public ListenableFuture<Long> getStartCode() {
+    return readService.submit(new Callable<Long>() {
+      @Override public Long call() throws Exception {
+        return server.getStartCode();
+      }
+    });
   }
 
   @Override
-  public List<String> getStoreFileList(byte[] regionName, byte[] columnFamily)
-      throws ThriftHBaseException {
-    try {
-      return server.getStoreFileList(regionName, columnFamily);
-    } catch (IllegalArgumentException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<List<String>> getStoreFileList(final byte[] regionName, final byte[] columnFamily) {
+    return readService.submit(new Callable<List<String>>() {
+      @Override public List<String> call() throws Exception {
+        try {
+          return server.getStoreFileList(regionName, columnFamily);
+        } catch (IllegalArgumentException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public List<String> getStoreFileListForColumnFamilies(byte[] regionName,
-      List<byte[]> columnFamilies) throws ThriftHBaseException {
-    try {
-      byte[][] columnFamiliesArray = new byte[columnFamilies.size()][];
-      return server.getStoreFileList(regionName, columnFamilies.toArray(columnFamiliesArray));
-    } catch (IllegalArgumentException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<List<String>> getStoreFileListForColumnFamilies(final byte[] regionName,
+      final List<byte[]> columnFamilies) {
+    return readService.submit(new Callable<List<String>>() {
+      @Override public List<String> call() throws Exception {
+        try {
+          byte[][] columnFamiliesArray = new byte[columnFamilies.size()][];
+          return server.getStoreFileList(regionName, columnFamilies.toArray(columnFamiliesArray));
+        } catch (IllegalArgumentException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public List<String> getStoreFileListForAllColumnFamilies(byte[] regionName)
-      throws ThriftHBaseException {
-    try {
-      return server.getStoreFileList(regionName);
-    } catch (IllegalArgumentException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<List<String>> getStoreFileListForAllColumnFamilies(final byte[] regionName) {
+    return readService.submit(new Callable<List<String>>() {
+      @Override public List<String> call() throws Exception {
+        try {
+          return server.getStoreFileList(regionName);
+        } catch (IllegalArgumentException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public List<String> getHLogsList(boolean rollCurrentHLog)
-      throws ThriftHBaseException {
-    try {
-      return server.getHLogsList(rollCurrentHLog);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<List<String>> getHLogsList(final boolean rollCurrentHLog) {
+    return readService.submit(new Callable<List<String>>() {
+      @Override public List<String> call() throws Exception {
+        try {
+          return server.getHLogsList(rollCurrentHLog);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public Result get(byte[] regionName, Get get) throws ThriftHBaseException {
-    try {
-      return server.get(regionName, get);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<Result> get(final byte[] regionName, final Get get) {
+    return readService.submit(new Callable<Result>() {
+      @Override public Result call() throws Exception {
+        try {
+          return server.get(regionName, get);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
 
   @Override
-  public List<Result> getRows(byte[] regionName, List<Get> gets)
-      throws ThriftHBaseException {
-    try {
-      List<Result> resultList = new ArrayList<>();
-      Collections.addAll(resultList, server.get(regionName, gets));
-      return resultList;
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<List<Result>> getRows(final byte[] regionName, final List<Get> gets) {
+    return readService.submit(new Callable<List<Result>>() {
+      @Override public List<Result> call() throws Exception {
+        try {
+          List<Result> resultList = new ArrayList<>();
+          Collections.addAll(resultList, server.get(regionName, gets));
+          return resultList;
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public boolean exists(byte[] regionName, Get get)
-      throws ThriftHBaseException {
-    try {
-      return server.exists(regionName, get);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<Boolean> exists(final byte[] regionName, final Get get){
+    return readService.submit(new Callable<Boolean>() {
+      @Override public Boolean call() throws Exception {
+        try {
+          return server.exists(regionName, get);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
 
   @Override
-  public void put(byte[] regionName, Put put) throws ThriftHBaseException {
-    try {
-      server.put(regionName, put);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
-  }
+  public ListenableFuture<Void> put(final byte[] regionName, final Put put) {
 
-  @Override
-  public int putRows(byte[] regionName, List<Put> puts)
-      throws ThriftHBaseException {
-    try {
-      return server.put(regionName, puts);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+    return writeService.submit(new Callable<Void>() {
+      @Override public Void call() throws Exception {
+        try {
+          server.put(regionName, put);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+        return null;
+      }
+    });
   }
 
   @Override
-  public void processDelete(byte[] regionName, Delete delete)
-      throws ThriftHBaseException {
-    try {
-      server.delete(regionName, delete);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<Integer> putRows(final byte[] regionName, final List<Put> puts) {
+    return writeService.submit(new Callable<Integer>() {
+      @Override public Integer call() throws Exception {
+        try {
+          return server.put(regionName, puts);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public int processListOfDeletes(byte[] regionName, List<Delete> deletes)
-      throws ThriftHBaseException {
-    try {
-      return server.delete(regionName, deletes);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<Void> processDelete(final byte[] regionName, final Delete delete)  {
+    return writeService.submit(new Callable<Void>() {
+      @Override public Void call() throws Exception {
+        try {
+          server.delete(regionName, delete);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+        return null;
+      }
+    });
   }
 
   @Override
-  public boolean checkAndPut(byte[] regionName, byte[] row, byte[] family,
-      byte[] qualifier, byte[] value, Put put) throws ThriftHBaseException {
-    try {
-      return server.checkAndPut(regionName, row, family, qualifier, value, put);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<Integer> processListOfDeletes(final byte[] regionName, final List<Delete> deletes) {
+    return writeService.submit(new Callable<Integer>() {
+      @Override public Integer call() throws Exception {
+        try {
+          return server.delete(regionName, deletes);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public boolean checkAndDelete(byte[] regionName, byte[] row, byte[] family,
-      byte[] qualifier, byte[] value, Delete delete)
-      throws ThriftHBaseException {
-    try {
-      return server.checkAndDelete(regionName, row, family, qualifier, value,
-          delete);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<Boolean> checkAndPut(final byte[] regionName, final byte[] row, final byte[] family,
+      final byte[] qualifier, final byte[] value, final Put put) {
+
+    return writeService.submit(new Callable<Boolean>() {
+      @Override public Boolean call() throws Exception {
+        try {
+          return server.checkAndPut(regionName, row, family, qualifier, value, put);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public long incrementColumnValue(byte[] regionName, byte[] row,
-      byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
-      throws ThriftHBaseException {
-    try {
-      return server.incrementColumnValue(regionName, row, family, qualifier, amount, writeToWAL);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<Boolean> checkAndDelete(final byte[] regionName, final byte[] row,
+      final byte[] family, final byte[] qualifier, final byte[] value, final Delete delete) {
+
+    return writeService.submit(new Callable<Boolean>() {
+      @Override public Boolean call() throws Exception {
+        try {
+          return server.checkAndDelete(regionName, row, family, qualifier, value,
+              delete);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public long openScanner(byte[] regionName, Scan scan)
-      throws ThriftHBaseException {
-    try {
-      return server.openScanner(regionName, scan);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<Long> incrementColumnValue(final byte[] regionName, final byte[] row,
+      final byte[] family, final byte[] qualifier, final long amount, final boolean writeToWAL) {
+
+    return writeService.submit(new Callable<Long>() {
+      @Override public Long call() throws Exception {
+        try {
+          return server.incrementColumnValue(regionName, row, family, qualifier, amount, writeToWAL);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public void mutateRow(byte[] regionName, TRowMutations arm)
-      throws ThriftHBaseException {
-    try {
-      server.mutateRow(regionName, RowMutations.Builder.createFromTRowMutations(arm));
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<Long> openScanner(final byte[] regionName, final Scan scan) {
+    return readService.submit(new Callable<Long>() {
+      @Override public Long call() throws Exception {
+        try {
+          return server.openScanner(regionName, scan);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public void mutateRows(byte[] regionName, List<TRowMutations> armList)
-      throws ThriftHBaseException {
-    try {
-      List<RowMutations> rowMutations = new ArrayList<>();
-      for (TRowMutations mutation : armList) {
-        rowMutations.add(RowMutations.Builder.createFromTRowMutations(mutation));
+  public ListenableFuture<Void> mutateRow(final byte[] regionName, final TRowMutations arm) {
+
+    return writeService.submit(new Callable<Void>() {
+      @Override public Void call() throws Exception {
+        try {
+          server.mutateRow(regionName, RowMutations.Builder.createFromTRowMutations(arm));
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+        return null;
       }
-      server.mutateRow(regionName, rowMutations);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+    });
+  }
+
+  @Override
+  public ListenableFuture<Void> mutateRows(final byte[] regionName, final List<TRowMutations> armList) {
+    return writeService.submit(new Callable<Void>() {
+      @Override public Void call() throws Exception {
+        try {
+          List<RowMutations> rowMutations = new ArrayList<>();
+          for (TRowMutations mutation : armList) {
+            rowMutations.add(RowMutations.Builder.createFromTRowMutations(mutation));
+          }
+          server.mutateRow(regionName, rowMutations);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+        return null;
+      }
+    });
   }
 
   @Override
   @Deprecated
-  public Result next(long scannerId) throws ThriftHBaseException {
-    try {
-      return server.next(scannerId);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<Result> next(final long scannerId) {
+    return readService.submit(new Callable<Result>() {
+      @Override public Result call() throws Exception {
+        try {
+          return server.next(scannerId);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
+  }
+
+  @Override
+  public ListenableFuture<List<Result>> nextRows(final long scannerId, final int numberOfRows) {
+    return readService.submit(new Callable<List<Result>>() {
+      @Override public List<Result> call() throws Exception {
+        try {
+          Result[] result = server.nextInternal(scannerId, numberOfRows);
+          List<Result> resultList = new ArrayList<>(result.length);
+          for (int i = 0; i < result.length; i ++) {
+            resultList.add(addThriftRegionInfoQualifierIfNeeded(result[i]));
+          }
+          return resultList;
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public List<Result> nextRows(long scannerId, int numberOfRows)
-      throws ThriftHBaseException {
-    try {
-      Result[] result = server.nextInternal(scannerId, numberOfRows);
-      List<Result> resultList = new ArrayList<>(result.length);
-      for (int i = 0; i < result.length; i ++) {
-        resultList.add(this.addThriftRegionInfoQualifierIfNeeded(result[i]));
+  public ListenableFuture<Void> close(final long scannerId) {
+    return readService.submit(new Callable<Void>() {
+      @Override public Void call() throws Exception {
+        try {
+          server.close(scannerId);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+        return null;
       }
-      return resultList;
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+    });
   }
 
   @Override
-  public void close(long scannerId) throws ThriftHBaseException {
-    try {
-      server.close(scannerId);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<RowLock> lockRow(final byte[] regionName, final byte[] row) {
+    return readService.submit(new Callable<RowLock>() {
+      @Override public RowLock call() throws Exception {
+        try {
+          return new RowLock(row, server.lockRow(regionName, row));
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public RowLock lockRow(byte[] regionName, byte[] row)
-      throws ThriftHBaseException {
-    try {
-      return new RowLock(row, server.lockRow(regionName, row));
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<Void> unlockRow(final byte[] regionName, final long lockId) {
+    return readService.submit(new Callable<Void>() {
+      @Override public Void call() throws Exception {
+        try {
+          server.unlockRow(regionName, lockId);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+        return null;
+      }
+    });
   }
 
   @Override
-  public void unlockRow(byte[] regionName, long lockId)
-      throws ThriftHBaseException {
-    try {
-      server.unlockRow(regionName, lockId);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
-  }
+  public ListenableFuture<List<HRegionInfo>> getRegionsAssignment() {
 
-  @Override
-  public List<HRegionInfo> getRegionsAssignment() throws ThriftHBaseException {
-    try {
-      return Arrays.asList(server.getRegionsAssignment());
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+    return readService.submit(new Callable<List<HRegionInfo>>() {
+      @Override public List<HRegionInfo> call() throws Exception {
+        try {
+          return Arrays.asList(server.getRegionsAssignment());
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public HServerInfo getHServerInfo() throws ThriftHBaseException {
-    try {
-      return server.getHServerInfo();
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<HServerInfo> getHServerInfo() {
+    return readService.submit(new Callable<HServerInfo>() {
+      @Override public HServerInfo call() throws Exception {
+        try {
+          return server.getHServerInfo();
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public TMultiResponse multiAction(MultiAction multi)
-      throws ThriftHBaseException {
-    try {
-      return TMultiResponse.Builder.createFromMultiResponse(server
-          .multiAction(multi));
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<TMultiResponse> multiAction(final MultiAction multi) {
+    return writeService.submit(new Callable<TMultiResponse>() {
+      @Override public TMultiResponse call() throws Exception {
+        try {
+          return TMultiResponse.Builder.createFromMultiResponse(server
+              .multiAction(multi));
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public MultiPutResponse multiPut(MultiPut puts) throws ThriftHBaseException {
-    try {
-      return server.multiPut(puts);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<MultiPutResponse> multiPut(final MultiPut puts) {
+
+    return writeService.submit(new Callable<MultiPutResponse>() {
+      @Override public MultiPutResponse call() throws Exception {
+        try {
+          return server.multiPut(puts);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public void bulkLoadHFile(String hfilePath, byte[] regionName,
-      byte[] familyName) throws ThriftHBaseException {
-    try {
-      server.bulkLoadHFile(hfilePath, regionName, familyName);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<Void> bulkLoadHFile(final String hfilePath, final byte[] regionName,
+      final byte[] familyName) {
+    return writeService.submit(new Callable<Void>() {
+      @Override public Void call() throws Exception {
+        try {
+          server.bulkLoadHFile(hfilePath, regionName, familyName);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+        return null;
+      }
+    });
   }
 
   @Override
-  public void bulkLoadHFile(String hfilePath, byte[] regionName,
-      byte[] familyName, boolean assignSeqNum) throws ThriftHBaseException {
-    try {
-      server.bulkLoadHFile(hfilePath, regionName, familyName, assignSeqNum);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<Void> bulkLoadHFile(final String hfilePath, final byte[] regionName,
+      final byte[] familyName, final boolean assignSeqNum) {
+    return writeService.submit(new Callable<Void>() {
+      @Override public Void call() throws Exception {
+        try {
+          server.bulkLoadHFile(hfilePath, regionName, familyName, assignSeqNum);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+        return null;
+      }
+    });
   }
 
   @Override
-  public void closeRegion(HRegionInfo hri, boolean reportWhenCompleted)
-      throws ThriftHBaseException {
-    try {
-      server.closeRegion(hri, reportWhenCompleted);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<Void> closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted) {
+
+    return readService.submit(new Callable<Void>() {
+      @Override public Void call() throws Exception {
+        try {
+          server.closeRegion(hri, reportWhenCompleted);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+        return null;
+      }
+    });
   }
 
   @Override
-  public int updateFavoredNodes(AssignmentPlan plan) throws ThriftHBaseException {
-    try {
-      return server.updateFavoredNodes(plan);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<Integer> updateFavoredNodes(final AssignmentPlan plan) {
+    return readService.submit(new Callable<Integer>() {
+      @Override public Integer call() throws Exception {
+        try {
+          return server.updateFavoredNodes(plan);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public void updateConfiguration() {
-    server.updateConfiguration();
+  public ListenableFuture<Void> updateConfiguration() {
+    return readService.submit(new Callable<Void>() {
+      @Override public Void call() throws Exception {
+        server.updateConfiguration();
+        return null;
+      }
+    });
   }
 
   @Override
-  public void stop(String why) {
-    server.stop(why);
+  public ListenableFuture<Void> stop(final String why) {
+    return readService.submit(new Callable<Void>() {
+      @Override public Void call() throws Exception {
+        server.stop(why);
+        return null;
+      }
+    });
   }
 
   @Override
-  public String getStopReason() {
-    return server.getStopReason();
+  public ListenableFuture<String> getStopReason() {
+    return readService.submit(new Callable<String>() {
+      @Override public String call() throws Exception {
+        return server.getStopReason();
+      }
+    });
   }
 
   @Override
-  public void setNumHDFSQuorumReadThreads(int maxThreads) {
-    server.setNumHDFSQuorumReadThreads(maxThreads);
+  public ListenableFuture<Void> setNumHDFSQuorumReadThreads(final int maxThreads) {
+    return readService.submit(new Callable<Void>() {
+      @Override public Void call() throws Exception {
+        server.setNumHDFSQuorumReadThreads(maxThreads);
+        return null;
+      }
+    });
   }
 
   @Override
-  public void setHDFSQuorumReadTimeoutMillis(long timeoutMillis) {
+  public ListenableFuture<Void> setHDFSQuorumReadTimeoutMillis(long timeoutMillis) {
     server.setHDFSQuorumReadTimeoutMillis(timeoutMillis);
+    return null;
+
   }
 
   @Override
-  public boolean isStopped() {
-    return server.isStopped();
+  public ListenableFuture<Boolean> isStopped() {
+    return readService.submit(new Callable<Boolean>() {
+      @Override public Boolean call() throws Exception {
+        return server.isStopped();
+      }
+    });
   }
 
   @Override
-  public void stopForRestart() {
-    server.stopForRestart();
+  public ListenableFuture<Void> stopForRestart() {
+
+    return readService.submit(new Callable<Void>() {
+      @Override public Void call() throws Exception {
+        server.stopForRestart();
+        return null;
+      }
+    });
   }
 
   @Override
@@ -546,76 +716,157 @@ public class ThriftHRegionServer implements ThriftHRegionInterface.Sync {
   }
 
   @Override
-  public String getConfProperty(String paramName) throws ThriftHBaseException {
-    return server.getConfProperty(paramName);
+  public ListenableFuture<String> getConfProperty(final String paramName) {
+    return readService.submit(new Callable<String>() {
+      @Override public String call() throws Exception {
+        return server.getConfProperty(paramName);
+      }
+    });
   }
 
   @Override
-  public List<Bucket> getHistogram(byte[] regionName)
-      throws ThriftHBaseException {
-    try {
-      List<Bucket> buckets = server.getHistogram(regionName);
-      if (buckets == null) return new ArrayList<Bucket>();
-      return buckets;
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<List<Bucket>> getHistogram(final byte[] regionName) {
+    return readService.submit(new Callable<List<Bucket>>() {
+      @Override public List<Bucket> call() throws Exception {
+        try {
+          List<Bucket> buckets = server.getHistogram(regionName);
+          if (buckets == null) return new ArrayList<Bucket>();
+          return buckets;
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public List<Bucket> getHistogramForStore(byte[] regionName, byte[] family)
-      throws ThriftHBaseException {
-    try {
-      List<Bucket> buckets = server.getHistogramForStore(regionName, family);
-      if (buckets == null) return new ArrayList<Bucket>();
-      return buckets;
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<List<Bucket>> getHistogramForStore(final byte[] regionName, final byte[] family) {
+    return readService.submit(new Callable<List<Bucket>>() {
+      @Override public List<Bucket> call() throws Exception {
+        try {
+          List<Bucket> buckets = server.getHistogramForStore(regionName, family);
+          if (buckets == null) return new ArrayList<Bucket>();
+          return buckets;
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public byte[] callEndpoint(String epName, String methodName,
-      List<byte[]> params, final byte[] regionName, final byte[] startRow,
-      final byte[] stopRow) throws ThriftHBaseException, IOException {
-    return server.callEndpoint(epName, methodName, params, regionName,
-        startRow, stopRow);
+  public ListenableFuture<byte[]> callEndpoint(final String epName, final String methodName,
+      final List<byte[]> params, final byte[] regionName, final byte[] startRow,
+      final byte[] stopRow) {
+
+    return readService.submit(new Callable<byte[]>() {
+      @Override public byte[] call() throws Exception {
+        try {
+          return server.callEndpoint(epName, methodName, params, regionName,
+              startRow, stopRow);
+        } catch (IOException ioe) {
+          throw new ThriftHBaseException(ioe);
+        }
+      }
+    });
   }
 
   @Override
-  public List<List<Bucket>> getHistograms(List<byte[]> regionNames)
-    throws ThriftHBaseException {
-    try {
-      return server.getHistograms(regionNames);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<List<List<Bucket>>> getHistograms(final List<byte[]> regionNames) {
+    return readService.submit(new Callable<List<List<Bucket>>>() {
+      @Override public List<List<Bucket>> call() throws Exception {
+        try {
+          return server.getHistograms(regionNames);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public HRegionLocation getLocation(byte[] table, byte[] row, boolean reload)
-    throws ThriftHBaseException {
-    try {
-      return server.getLocation(table, row, reload);
-    } catch (IOException e) {
-      throw new ThriftHBaseException(e);
-    }
+  public ListenableFuture<HRegionLocation> getLocation(
+      final byte[] table, final byte[] row, final boolean reload)  {
+
+    return readService.submit(new Callable<HRegionLocation>() {
+      @Override public HRegionLocation call() throws Exception {
+        try {
+          return server.getLocation(table, row, reload);
+        } catch (IOException e) {
+          throw new ThriftHBaseException(e);
+        }
+      }
+    });
   }
 
   @Override
-  public ScannerResult scanOpen(byte[] regionName, Scan scan, int numberOfRows)
-      throws ThriftHBaseException {
-    return server.scanOpen(regionName, scan, numberOfRows);
+  public ListenableFuture<ScannerResult> scanOpen(
+      final byte[] regionName, final Scan scan, final int numberOfRows) {
+
+    return readService.submit(new Callable<ScannerResult>() {
+      @Override public ScannerResult call() throws Exception {
+        return server.scanOpen(regionName, scan, numberOfRows);
+      }
+    });
   }
 
   @Override
-  public ScannerResult scanNext(long id, int numberOfRows)
-      throws ThriftHBaseException {
-    return server.scanNext(id, numberOfRows);
+  public ListenableFuture<ScannerResult> scanNext(final long id, final int numberOfRows) {
+
+    return readService.submit(new Callable<ScannerResult>() {
+      @Override public ScannerResult call() throws Exception {
+        return server.scanNext(id, numberOfRows);
+      }
+    });
   }
 
   @Override
-  public boolean scanClose(long id) throws ThriftHBaseException {
-    return server.scanClose(id);
+  public ListenableFuture<Boolean> scanClose(final long id) {
+    return readService.submit(new Callable<Boolean>() {
+      @Override public Boolean call() throws Exception {
+        return server.scanClose(id);
+      }
+    });
+  }
+
+  /**
+   * If there is a 'regioninfo' value in the result, it would be in Writable
+   * serialized form. Add a thrift serialized HRegionInfo object for the
+   * non-Java objects.
+   */
+  private Result addThriftRegionInfoQualifierIfNeeded(Result tentativeResult) {
+    //TODO: Thrift has some problem serializing HRegionInfo. Since this method is only
+    // for C++ client, I temporarily disable it. After the problem is fixed we should
+    // remove the flag.
+    if (HConstants.DISABLE_THRIFT_REGION_INFO_QUALIFIER) {
+      return tentativeResult;
+    }
+
+    // Get the serialized HRegionInfo object
+    byte[] value = tentativeResult.searchValue(HConstants.CATALOG_FAMILY,
+        HConstants.REGIONINFO_QUALIFIER);
+    // If the value exists, then we need to do something, otherwise, we return
+    // the result untouched.
+    if (value != null && value.length > 0) {
+      try {
+        // Get the Writable-serialized HRegionInfo object.
+        HRegionInfo hri = (HRegionInfo) Writables.getWritable(value,
+            new HRegionInfo());
+        byte[] thriftSerializedHri =
+            Bytes.writeThriftBytes(hri, HRegionInfo.class);
+        // Create the KV with the thrift-serialized HRegionInfo
+        List<KeyValue> kvList = tentativeResult.getKvs();
+        KeyValue kv = new KeyValue(kvList.get(0).getRow(),
+            HConstants.CATALOG_FAMILY, HConstants.THRIFT_REGIONINFO_QUALIFIER,
+            thriftSerializedHri);
+        kvList.add(kv);
+        return new Result(kvList);
+      } catch (Exception e) {
+        // If failed, log the mistake and returns the original result
+        LOG.error("Thrift Serialization of the HRegionInfo object failed!");
+        e.printStackTrace();
+      }
+    }
+    return tentativeResult;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b30de6e/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
index 6a9fec1..facdc1b 100644
--- a/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
+++ b/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.ipc.ThriftHRegionInterface;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.SizeAwareThriftHRegionServer;
 import org.apache.hadoop.hbase.regionserver.ThriftHRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
@@ -439,8 +440,8 @@ public class MiniHBaseCluster {
     return hbaseCluster.getRegionServer(serverNumber);
   }
 
-  public ThriftHRegionInterface.Sync getThriftRegionServer(int serverNumber) {
-    return new ThriftHRegionServer(getRegionServer(serverNumber));
+  public ThriftHRegionInterface.Async getThriftRegionServer(int serverNumber) {
+    return new SizeAwareThriftHRegionServer(getRegionServer(serverNumber));
   }
 
   public List<HRegion> getRegions(byte[] tableName) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b30de6e/src/test/java/org/apache/hadoop/hbase/regionserver/FailureInjectingThriftHRegionServer.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/FailureInjectingThriftHRegionServer.java b/src/test/java/org/apache/hadoop/hbase/regionserver/FailureInjectingThriftHRegionServer.java
index 2015343..21bc107 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/FailureInjectingThriftHRegionServer.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/FailureInjectingThriftHRegionServer.java
@@ -68,7 +68,7 @@ public class FailureInjectingThriftHRegionServer extends ThriftHRegionServer {
   }
 
   @Override
-  public Result get(byte[] regionName, Get get) throws ThriftHBaseException {
+  public ListenableFuture<Result> get(byte[] regionName, Get get) {
     switch (failureType) {
       // Region server will throw RegionOverloadedException, wrapped in ThriftHBaseException
       case REGIONOVERLOADEDEXCEPTION:
@@ -77,8 +77,8 @@ public class FailureInjectingThriftHRegionServer extends ThriftHRegionServer {
           return super.get(regionName, get);
         }
         LOG.debug("Exception repeat count: " + repeatCount);
-        throw new ThriftHBaseException(
-            new RegionOverloadedException("GetAsync Test ROE", HConstants.DEFAULT_HBASE_CLIENT_PAUSE));
+        return Futures.immediateFailedFuture(new ThriftHBaseException(
+            new RegionOverloadedException("GetAsync Test ROE", HConstants.DEFAULT_HBASE_CLIENT_PAUSE)));
 
       // Region server will disconnect the client, cause TTransportException on client side
       case STOP:
@@ -101,7 +101,7 @@ public class FailureInjectingThriftHRegionServer extends ThriftHRegionServer {
           return super.get(regionName, get);
         }
         LOG.debug("Exception repeat count: " + repeatCount);
-        throw new ThriftHBaseException(getRetriableExceptions(repeatCount));
+        return Futures.immediateFailedFuture(new ThriftHBaseException(getRetriableExceptions(repeatCount)));
 
       case DONOTRETRYEXCEPTION:
         if (++repeatCount > repeats) {
@@ -109,8 +109,8 @@ public class FailureInjectingThriftHRegionServer extends ThriftHRegionServer {
           return super.get(regionName, get);
         }
         LOG.debug("Exception repeat count: " + repeatCount);
-        throw new ThriftHBaseException(
-            new DoNotRetryIOException("GetAsync Test DoNotRetryIOE"));
+        return Futures.immediateFailedFuture( new ThriftHBaseException(
+            new DoNotRetryIOException("GetAsync Test DoNotRetryIOE")));
 
       default:
         return super.get(regionName, get);

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b30de6e/src/test/java/org/apache/hadoop/hbase/thrift/swift/TestHRegionInterfaceSimpleFunctions.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/thrift/swift/TestHRegionInterfaceSimpleFunctions.java b/src/test/java/org/apache/hadoop/hbase/thrift/swift/TestHRegionInterfaceSimpleFunctions.java
index 9af0b66..8667a8e 100644
--- a/src/test/java/org/apache/hadoop/hbase/thrift/swift/TestHRegionInterfaceSimpleFunctions.java
+++ b/src/test/java/org/apache/hadoop/hbase/thrift/swift/TestHRegionInterfaceSimpleFunctions.java
@@ -73,7 +73,7 @@ public class TestHRegionInterfaceSimpleFunctions {
       throws IOException, InterruptedException, ExecutionException, TimeoutException, ThriftHBaseException {
     HTable table = TEST_UTIL.createTable(TABLENAME, FAMILY);
     HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
-    ThriftHRegionInterface.Sync thriftServer = TEST_UTIL.getHBaseCluster().getThriftRegionServer(0);
+    ThriftHRegionInterface.Async thriftServer = TEST_UTIL.getHBaseCluster().getThriftRegionServer(0);
     TEST_UTIL.loadTable(table, FAMILY);
 
     HServerInfo info = server.getHServerInfo();
@@ -151,7 +151,7 @@ public class TestHRegionInterfaceSimpleFunctions {
     assertEquals(client.getHLogsList(false), server.getHLogsList(false));
 
     // getReginoAssignment
-    List<HRegionInfo> infos = thriftServer.getRegionsAssignment();
+    List<HRegionInfo> infos = thriftServer.getRegionsAssignment().get();
     assertFalse(infos.isEmpty());
   }
 
@@ -161,7 +161,7 @@ public class TestHRegionInterfaceSimpleFunctions {
   @Test
   public void testAtomicMutation() throws Exception {
     HTable table = TEST_UTIL.createTable(TABLENAME, FAMILY);
-    ThriftHRegionInterface.Sync thriftServer = TEST_UTIL.getHBaseCluster().getThriftRegionServer(0);
+    ThriftHRegionInterface.Async thriftServer = TEST_UTIL.getHBaseCluster().getThriftRegionServer(0);
 
     byte[] row = Bytes.toBytes("test-row");
     byte[] invalidValue = Bytes.toBytes("test-row2");
@@ -169,18 +169,18 @@ public class TestHRegionInterfaceSimpleFunctions {
     byte[] regionName = regionInfo.getRegionName();
     Put put = new Put(row);
     put.add(FAMILY, null, row);
-    assertTrue(thriftServer.checkAndPut(regionName, row, FAMILY, null, null, put));
+    assertTrue(thriftServer.checkAndPut(regionName, row, FAMILY, null, null, put).get());
     assertFalse(thriftServer.checkAndPut(
-        regionInfo.getRegionName(), row, FAMILY, null, invalidValue, put));
+        regionInfo.getRegionName(), row, FAMILY, null, invalidValue, put).get());
     Delete delete = new Delete(row);
     delete.deleteFamily(FAMILY);
-    assertFalse(thriftServer.checkAndDelete(regionName, row, FAMILY, null, invalidValue, delete));
-    assertTrue(thriftServer.checkAndDelete(regionName, row, FAMILY, null, row, delete));
+    assertFalse(thriftServer.checkAndDelete(regionName, row, FAMILY, null, invalidValue, delete).get());
+    assertTrue(thriftServer.checkAndDelete(regionName, row, FAMILY, null, row, delete).get());
 
     put = new Put(row);
     put.add(FAMILY, null, Bytes.toBytes(1L));
-    thriftServer.put(regionName, put);
-    long result = thriftServer.incrementColumnValue(regionName, row, FAMILY, null, 100L, false);
+    thriftServer.put(regionName, put).get();
+    long result = thriftServer.incrementColumnValue(regionName, row, FAMILY, null, 100L, false).get();
     assertEquals(101L, result);
   }
 
@@ -190,13 +190,13 @@ public class TestHRegionInterfaceSimpleFunctions {
   @Test
   public void testQuorumConfigurationChanges() throws Exception {
     HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
-    ThriftHRegionInterface.Sync thriftServer = TEST_UTIL.getHBaseCluster().getThriftRegionServer(0);
+    ThriftHRegionInterface.Async thriftServer = TEST_UTIL.getHBaseCluster().getThriftRegionServer(0);
 
     int threads = server.getQuorumReadThreadsMax() + 1;
     long timeout = server.getQuorumReadTimeoutMillis() + 1;
 
-    thriftServer.setNumHDFSQuorumReadThreads(threads);
-    thriftServer.setHDFSQuorumReadTimeoutMillis(timeout);
+    thriftServer.setNumHDFSQuorumReadThreads(threads).get();
+    thriftServer.setHDFSQuorumReadTimeoutMillis(timeout).get();
 
     assertEquals(threads, server.getQuorumReadThreadsMax());
     assertEquals(timeout, server.getQuorumReadTimeoutMillis());
@@ -208,7 +208,7 @@ public class TestHRegionInterfaceSimpleFunctions {
   @Test
   public void testCloseRegion() throws Exception {
     HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
-    ThriftHRegionInterface.Sync thriftServer = TEST_UTIL.getHBaseCluster().getThriftRegionServer(0);
+    ThriftHRegionInterface.Async thriftServer = TEST_UTIL.getHBaseCluster().getThriftRegionServer(0);
 
     HRegion[] region = server.getOnlineRegionsAsArray();
     HRegionInfo regionInfo = region[0].getRegionInfo();
@@ -218,7 +218,7 @@ public class TestHRegionInterfaceSimpleFunctions {
     String regionZNode = zkWrapper.getZNode(
         zkWrapper.getRegionInTransitionZNode(), regionInfo.getEncodedName());
 
-    thriftServer.closeRegion(regionInfo, true);
+    thriftServer.closeRegion(regionInfo, true).get();
 
     byte[] data = zkWrapper.readZNode(regionZNode, new Stat());
     RegionTransitionEventData rsData = new RegionTransitionEventData();
@@ -235,11 +235,11 @@ public class TestHRegionInterfaceSimpleFunctions {
    */
   @Test
   public void testStopRegionServer() throws Exception {
-    ThriftHRegionInterface.Sync thriftServer = TEST_UTIL.getHBaseCluster().getThriftRegionServer(0);
-    thriftServer.updateConfiguration();
+    ThriftHRegionInterface.Async thriftServer = TEST_UTIL.getHBaseCluster().getThriftRegionServer(0);
+    thriftServer.updateConfiguration().get();
 
     String why = "test reason";
     thriftServer.stop(why);
-    assertEquals(thriftServer.getStopReason(), why);
+    assertEquals(thriftServer.getStopReason().get(), why);
   }
 }


Mime
View raw message