hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject git commit: HBASE-12086 Fix bug of HTableMultipliexer
Date Fri, 26 Sep 2014 02:22:43 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 be6d35430 -> a01f1f8ef


HBASE-12086 Fix bug of HTableMultipliexer

Signed-off-by: Elliott Clark <eclark@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a01f1f8e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a01f1f8e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a01f1f8e

Branch: refs/heads/branch-1
Commit: a01f1f8ef9bb96d9571cf5c16ba9ab8c17a6e628
Parents: be6d354
Author: David Deng <daviddengcn@gmail.com>
Authored: Tue Sep 23 22:46:03 2014 -0700
Committer: Elliott Clark <eclark@apache.org>
Committed: Thu Sep 25 19:17:20 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       |  27 ++-
 .../org/apache/hadoop/hbase/client/HTable.java  |  17 +-
 .../hadoop/hbase/client/HTableMultiplexer.java  | 207 +++++++++----------
 .../hbase/client/TestHTableMultiplexer.java     |  65 +++---
 4 files changed, 171 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a01f1f8e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index b461ed2..93d71a5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.client;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
@@ -53,6 +52,7 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.htrace.Trace;
+
 import com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -288,6 +288,10 @@ class AsyncProcess {
     this.rpcFactory = rpcFactory;
   }
 
+  /**
+   * @return pool if non null, otherwise returns this.pool if non null, otherwise throws
+   *         RuntimeException
+   */
   private ExecutorService getPool(ExecutorService pool) {
     if (pool != null) return pool;
     if (this.pool != null) return this.pool;
@@ -352,8 +356,8 @@ class AsyncProcess {
           RegionLocations locs = hConnection.locateRegion(
               tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
           if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null)
{
-            throw new IOException("#" + id + ", no location found, aborting submit for" +
-                " tableName=" + tableName + " rowkey=" + Arrays.toString(r.getRow()));
+            throw new IOException("#" + id + ", no location found, aborting submit for"
+                + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
           }
           loc = locs.getDefaultRegionLocation();
         } catch (IOException ex) {
@@ -383,15 +387,24 @@ class AsyncProcess {
 
     if (retainedActions.isEmpty()) return NO_REQS_RESULT;
 
+    return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
+      locationErrors, locationErrorRows, actionsByServer, pool);
+  }
+
+  <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
+      List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult>
callback,
+      Object[] results, boolean needResults, List<Exception> locationErrors,
+      List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>>
actionsByServer,
+      ExecutorService pool) {
     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
-        tableName, retainedActions, nonceGroup, pool, callback, null, needResults);
+      tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
     // Add location errors if any
     if (locationErrors != null) {
       for (int i = 0; i < locationErrors.size(); ++i) {
         int originalIndex = locationErrorRows.get(i);
         Row row = retainedActions.get(originalIndex).getAction();
         ars.manageError(originalIndex, row,
-            Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
+          Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
       }
     }
     ars.sendMultiAction(actionsByServer, 1, null, false);
@@ -406,7 +419,7 @@ class AsyncProcess {
    * @param actionsByServer the multiaction per server
    * @param nonceGroup Nonce group.
    */
-  private void addAction(ServerName server, byte[] regionName, Action<Row> action,
+  private static void addAction(ServerName server, byte[] regionName, Action<Row> action,
       Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) {
     MultiAction<Row> multiAction = actionsByServer.get(server);
     if (multiAction == null) {
@@ -531,7 +544,7 @@ class AsyncProcess {
     return ars;
   }
 
-  private void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
+  private static void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
     if (!(r instanceof Append) && !(r instanceof Increment)) return;
     action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a01f1f8e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 26da937..8a6575e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -319,6 +319,13 @@ public class HTable implements HTableInterface, RegionLocator {
   }
 
   /**
+   * @return maxKeyValueSize from configuration.
+   */
+  public static int getMaxKeyValueSize(Configuration conf) {
+    return conf.getInt("hbase.client.keyvalue.maxsize", -1);
+  }
+
+  /**
    * setup this HTable's parameter based on the passed configuration
    */
   private void finishSetup() throws IOException {
@@ -348,8 +355,7 @@ public class HTable implements HTableInterface, RegionLocator {
     ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory);
     multiAp = this.connection.getAsyncProcess();
 
-    this.maxKeyValueSize = this.configuration.getInt(
-        "hbase.client.keyvalue.maxsize", -1);
+    this.maxKeyValueSize = getMaxKeyValueSize(this.configuration);
     this.closed = false;
   }
 
@@ -1470,7 +1476,12 @@ public class HTable implements HTableInterface, RegionLocator {
   }
 
   // validate for well-formedness
-  public void validatePut(final Put put) throws IllegalArgumentException{
+  public void validatePut(final Put put) throws IllegalArgumentException {
+    validatePut(put, maxKeyValueSize);
+  }
+
+  // validate for well-formedness
+  public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException
{
     if (put.isEmpty()) {
       throw new IllegalArgumentException("No columns to insert");
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a01f1f8e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index 9f5e836..e8c6909 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -22,13 +22,12 @@ package org.apache.hadoop.hbase.client;
 import java.io.IOException;
 import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -41,8 +40,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
@@ -67,35 +67,35 @@ public class HTableMultiplexer {
   
   static final String TABLE_MULTIPLEXER_FLUSH_FREQ_MS = "hbase.tablemultiplexer.flush.frequency.ms";
 
-  private Map<TableName, HTable> tableNameToHTableMap;
-
   /** The map between each region server to its corresponding buffer queue */
-  private Map<HRegionLocation, LinkedBlockingQueue<PutStatus>>
-    serverToBufferQueueMap;
+  private final Map<HRegionLocation, LinkedBlockingQueue<PutStatus>> serverToBufferQueueMap
=
+      new ConcurrentHashMap<>();
 
   /** The map between each region server to its flush worker */
-  private Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap;
+  private final Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap =
+      new ConcurrentHashMap<>();
 
-  private Configuration conf;
-  private int retryNum;
+  private final Configuration conf;
+  private final ClusterConnection conn;
+  private final ExecutorService pool;
+  private final int retryNum;
   private int perRegionServerBufferQueueSize;
+  private final int maxKeyValueSize;
   
   /**
-   * 
    * @param conf The HBaseConfiguration
-   * @param perRegionServerBufferQueueSize determines the max number of the buffered Put
ops 
-   *         for each region server before dropping the request.
+   * @param perRegionServerBufferQueueSize determines the max number of the buffered Put
ops for
+   *          each region server before dropping the request.
    */
-  public HTableMultiplexer(Configuration conf,
-      int perRegionServerBufferQueueSize) throws ZooKeeperConnectionException {
+  public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
+      throws IOException {
     this.conf = conf;
-    this.serverToBufferQueueMap = new ConcurrentHashMap<HRegionLocation,
-      LinkedBlockingQueue<PutStatus>>();
-    this.serverToFlushWorkerMap = new ConcurrentHashMap<HRegionLocation, HTableFlushWorker>();
-    this.tableNameToHTableMap = new ConcurrentSkipListMap<TableName, HTable>();
+    this.conn = (ClusterConnection) ConnectionFactory.createConnection(conf);
+    this.pool = HTable.getDefaultExecutor(conf);
     this.retryNum = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
     this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
+    this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
   }
 
   /**
@@ -110,10 +110,6 @@ public class HTableMultiplexer {
     return put(tableName, put, this.retryNum);
   }
 
-  public boolean put(byte[] tableName, final Put put) throws IOException {
-    return put(TableName.valueOf(tableName), put);
-  }
-
   /**
    * The puts request will be buffered by their corresponding buffer queue. 
    * Return the list of puts which could not be queued.
@@ -165,15 +161,14 @@ public class HTableMultiplexer {
       return false;
     }
 
-    LinkedBlockingQueue<PutStatus> queue;
-    HTable htable = getHTable(tableName);
     try {
-      htable.validatePut(put);
-      HRegionLocation loc = htable.getRegionLocation(put.getRow(), false);
+      HTable.validatePut(put, maxKeyValueSize);
+      HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
       if (loc != null) {
         // Add the put pair into its corresponding queue.
-        queue = addNewRegionServer(loc, htable);
-        // Generate a MultiPutStatus obj and offer it into the queue
+
+        LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
+        // Generate a MultiPutStatus object and offer it into the queue
         PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
         
         return queue.offer(s);
@@ -196,43 +191,30 @@ public class HTableMultiplexer {
     return new HTableMultiplexerStatus(serverToFlushWorkerMap);
   }
 
-
-  private HTable getHTable(TableName tableName) throws IOException {
-    HTable htable = this.tableNameToHTableMap.get(tableName);
-    if (htable == null) {
-      synchronized (this.tableNameToHTableMap) {
-        htable = this.tableNameToHTableMap.get(tableName);
-        if (htable == null)  {
-          htable = new HTable(conf, tableName);
-          this.tableNameToHTableMap.put(tableName, htable);
+  private LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
+    LinkedBlockingQueue<PutStatus> queue = serverToBufferQueueMap.get(addr);
+    if (queue == null) {
+      synchronized (this.serverToBufferQueueMap) {
+        queue = serverToBufferQueueMap.get(addr);
+        if (queue == null) {
+          // Create a queue for the new region server
+          queue = new LinkedBlockingQueue<PutStatus>(perRegionServerBufferQueueSize);
+          serverToBufferQueueMap.put(addr, queue);
+
+          // Create the flush worker
+          HTableFlushWorker worker =
+              new HTableFlushWorker(conf, this.conn, addr, this, queue, pool);
+          this.serverToFlushWorkerMap.put(addr, worker);
+
+          // Launch a daemon thread to flush the puts
+          // from the queue to its corresponding region server.
+          String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-" + (poolID++);
+          Thread t = new Thread(worker, name);
+          t.setDaemon(true);
+          t.start();
         }
       }
     }
-    return htable;
-  }
-
-  private synchronized LinkedBlockingQueue<PutStatus> addNewRegionServer(
-      HRegionLocation addr, HTable htable) {
-    LinkedBlockingQueue<PutStatus> queue =
-      serverToBufferQueueMap.get(addr);
-    if (queue == null) {
-      // Create a queue for the new region server
-      queue = new LinkedBlockingQueue<PutStatus>(perRegionServerBufferQueueSize);
-      serverToBufferQueueMap.put(addr, queue);
-
-      // Create the flush worker
-      HTableFlushWorker worker = new HTableFlushWorker(conf, addr,
-          this, queue, htable);
-      this.serverToFlushWorkerMap.put(addr, worker);
-
-      // Launch a daemon thread to flush the puts
-      // from the queue to its corresponding region server.
-      String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-"
-          + (poolID++);
-      Thread t = new Thread(worker, name);
-      t.setDaemon(true);
-      t.start();
-    }
     return queue;
   }
 
@@ -405,28 +387,25 @@ public class HTableMultiplexer {
   }
 
   private static class HTableFlushWorker implements Runnable {
-    private HRegionLocation addr;
-    private Configuration conf;
-    private LinkedBlockingQueue<PutStatus> queue;
-    private HTableMultiplexer htableMultiplexer;
-    private AtomicLong totalFailedPutCount;
-    private AtomicInteger currentProcessingPutCount;
-    private AtomicAverageCounter averageLatency;
-    private AtomicLong maxLatency;
-    private HTable htable; // For Multi
+    private final HRegionLocation addr;
+    private final Configuration conf;
+    private final ClusterConnection conn;
+    private final LinkedBlockingQueue<PutStatus> queue;
+    private final HTableMultiplexer htableMultiplexer;
+    private final AtomicLong totalFailedPutCount = new AtomicLong(0);
+    private final AtomicInteger currentProcessingPutCount = new AtomicInteger(0);
+    private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
+    private final AtomicLong maxLatency = new AtomicLong(0);
+    private final ExecutorService pool;
     
-    public HTableFlushWorker(Configuration conf, HRegionLocation addr,
-        HTableMultiplexer htableMultiplexer,
-        LinkedBlockingQueue<PutStatus> queue, HTable htable) {
+    public HTableFlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation
addr,
+        HTableMultiplexer htableMultiplexer, LinkedBlockingQueue<PutStatus> queue,
ExecutorService pool) {
       this.addr = addr;
       this.conf = conf;
+      this.conn = conn;
       this.htableMultiplexer = htableMultiplexer;
       this.queue = queue;
-      this.totalFailedPutCount = new AtomicLong(0);
-      this.currentProcessingPutCount = new AtomicInteger(0);
-      this.averageLatency = new AtomicAverageCounter();
-      this.maxLatency = new AtomicLong(0);
-      this.htable = htable;
+      this.pool = pool;
     }
 
     public long getTotalFailedCount() {
@@ -466,7 +445,7 @@ public class HTableMultiplexer {
     @edu.umd.cs.findbugs.annotations.SuppressWarnings
         (value = "REC_CATCH_EXCEPTION", justification = "na")
     public void run() {
-      List<PutStatus> processingList = new ArrayList<PutStatus>();
+      List<PutStatus> processingList = new ArrayList<>();
       /** 
        * The frequency in milliseconds for the current thread to process the corresponding
 
        * buffer queue.  
@@ -481,6 +460,8 @@ public class HTableMultiplexer {
         Thread.currentThread().interrupt();
       }
 
+      AsyncProcess ap = conn.getAsyncProcess();
+
       long start, elapsed;
       int failedCount = 0;
       while (true) {
@@ -496,16 +477,29 @@ public class HTableMultiplexer {
           currentProcessingPutCount.set(processingList.size());
 
           if (processingList.size() > 0) {
-            ArrayList<Put> list = new ArrayList<Put>(processingList.size());
-            for (PutStatus putStatus: processingList) {
-              list.add(putStatus.getPut());
+            List<Action<Row>> retainedActions = new ArrayList<>(processingList.size());
+            MultiAction<Row> actions = new MultiAction<>();
+            for (int i = 0; i < processingList.size(); i++) {
+              PutStatus putStatus = processingList.get(i);
+              Action<Row> action = new Action<Row>(putStatus.getPut(), i);
+              actions.add(putStatus.getRegionInfo().getRegionName(), action);
+              retainedActions.add(action);
             }
             
-            // Process this multiput request
-            List<Put> failed = null;
-            Object[] results = new Object[list.size()];
+            // Process this multi-put request
+            List<PutStatus> failed = null;
+            Object[] results = new Object[actions.size()];
+            ServerName server = addr.getServerName();
+            Map<ServerName, MultiAction<Row>> actionsByServer =
+                Collections.singletonMap(server, actions);
             try {
-              htable.batch(list, results);
+              AsyncRequestFuture arf =
+                  ap.submitMultiActions(null, retainedActions, 0L, null, results,
+                    true, null, null, actionsByServer, pool);
+              arf.waitUntilDone();
+              if (arf.hasError()) {
+                throw arf.getErrors();
+              }
             } catch (IOException e) {
               LOG.debug("Caught some exceptions " + e
                   + " when flushing puts to region server " + addr.getHostnamePort());
@@ -515,35 +509,26 @@ public class HTableMultiplexer {
               // results are returned in the same order as the requests in list
               // walk the list backwards, so we can remove from list without
               // impacting the indexes of earlier members
-              for (int i = results.length - 1; i >= 0; i--) {
-                if (results[i] instanceof Result) {
-                  // successful Puts are removed from the list here.
-                  list.remove(i);
+              for (int i = 0; i < results.length; i++) {
+                if (results[i] == null) {
+                  if (failed == null) {
+                    failed = new ArrayList<PutStatus>();
+                  }
+                  failed.add(processingList.get(i));
                 }
               }
-              failed = list;
             }
 
             if (failed != null) {
-              if (failed.size() == processingList.size()) {
-                // All the puts for this region server are failed. Going to retry it later
-                for (PutStatus putStatus: processingList) {
-                  if (!resubmitFailedPut(putStatus, this.addr)) {
-                    failedCount++;
-                  }
-                }
-              } else {
-                Set<Put> failedPutSet = new HashSet<Put>(failed);
-                for (PutStatus putStatus: processingList) {
-                  if (failedPutSet.contains(putStatus.getPut())
-                      && !resubmitFailedPut(putStatus, this.addr)) {
-                    failedCount++;
-                  }
+              // Resubmit failed puts
+              for (PutStatus putStatus : processingList) {
+                if (!resubmitFailedPut(putStatus, this.addr)) {
+                  failedCount++;
                 }
               }
+              // Update the totalFailedCount
+              this.totalFailedPutCount.addAndGet(failedCount);
             }
-            // Update the totalFailedCount
-            this.totalFailedPutCount.addAndGet(failedCount);
             
             elapsed = EnvironmentEdgeManager.currentTime() - start;
             // Update latency counters
@@ -580,7 +565,7 @@ public class HTableMultiplexer {
           // Log all the exceptions and move on
           LOG.debug("Caught some exceptions " + e
               + " when flushing puts to region server "
-              + addr.getHostnamePort());
+                + addr.getHostnamePort(), e);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a01f1f8e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
index 2f0bf37..cd85c38 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
@@ -27,9 +27,9 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -63,10 +63,27 @@ public class TestHTableMultiplexer {
     TEST_UTIL.shutdownMiniCluster();
   }
 
+  private static void checkExistence(HTable htable, byte[] row, byte[] family, byte[] quality)
+      throws Exception {
+    // verify that the Get returns the correct result
+    Result r;
+    Get get = new Get(row);
+    get.addColumn(FAMILY, QUALIFIER);
+    int nbTry = 0;
+    do {
+      assertTrue("Fail to get from " + htable.getName() + " after " + nbTry + " tries", nbTry
< 50);
+      nbTry++;
+      Thread.sleep(100);
+      r = htable.get(get);
+    } while (r == null || r.getValue(FAMILY, QUALIFIER) == null);
+    assertEquals("value", Bytes.toStringBinary(VALUE1),
+      Bytes.toStringBinary(r.getValue(FAMILY, QUALIFIER)));
+  }
+
   @Test
   public void testHTableMultiplexer() throws Exception {
-    TableName TABLE =
-        TableName.valueOf("testHTableMultiplexer");
+    TableName TABLE_1 = TableName.valueOf("testHTableMultiplexer_1");
+    TableName TABLE_2 = TableName.valueOf("testHTableMultiplexer_2");
     final int NUM_REGIONS = 10;
     final int VERSION = 3;
     List<Put> failedPuts;
@@ -75,35 +92,35 @@ public class TestHTableMultiplexer {
     HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(), 
         PER_REGIONSERVER_QUEUE_SIZE);
 
-    HTable ht = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, VERSION,
+    HTable htable1 =
+        TEST_UTIL.createTable(TABLE_1, new byte[][] { FAMILY }, VERSION,
         Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
-    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE);
+    HTable htable2 =
+        TEST_UTIL.createTable(TABLE_2, new byte[][] { FAMILY }, VERSION, Bytes.toBytes("aaaaa"),
+          Bytes.toBytes("zzzzz"), NUM_REGIONS);
+    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_1);
+    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_2);
 
-    byte[][] startRows = ht.getStartKeys();
-    byte[][] endRows = ht.getEndKeys();
+    byte[][] startRows = htable1.getStartKeys();
+    byte[][] endRows = htable1.getEndKeys();
 
     // SinglePut case
     for (int i = 0; i < NUM_REGIONS; i++) {
       byte [] row = startRows[i];
       if (row == null || row.length <= 0) continue;
-      Put put = new Put(row);
-      put.add(FAMILY, QUALIFIER, VALUE1);
-      success = multiplexer.put(TABLE, put);
-      assertTrue(success);
+      Put put = new Put(row).add(FAMILY, QUALIFIER, VALUE1);
+      success = multiplexer.put(TABLE_1, put);
+      assertTrue("multiplexer.put returns", success);
+
+      put = new Put(row).add(FAMILY, QUALIFIER, VALUE1);
+      success = multiplexer.put(TABLE_2, put);
+      assertTrue("multiplexer.put failed", success);
 
-      LOG.info("Put for " + Bytes.toString(startRows[i]) + " @ iteration " + (i+1));
+      LOG.info("Put for " + Bytes.toStringBinary(startRows[i]) + " @ iteration " + (i + 1));
 
       // verify that the Get returns the correct result
-      Get get = new Get(startRows[i]);
-      get.addColumn(FAMILY, QUALIFIER);
-      Result r;
-      int nbTry = 0;
-      do {
-        assertTrue(nbTry++ < 50);
-        Thread.sleep(100);
-        r = ht.get(get);
-      } while (r == null || r.getValue(FAMILY, QUALIFIER) == null);
-      assertEquals(0, Bytes.compareTo(VALUE1, r.getValue(FAMILY, QUALIFIER)));
+      checkExistence(htable1, startRows[i], FAMILY, QUALIFIER);
+      checkExistence(htable2, startRows[i], FAMILY, QUALIFIER);
     }
 
     // MultiPut case
@@ -115,7 +132,7 @@ public class TestHTableMultiplexer {
       put.add(FAMILY, QUALIFIER, VALUE2);
       multiput.add(put);
     }
-    failedPuts = multiplexer.put(TABLE, multiput);
+    failedPuts = multiplexer.put(TABLE_1, multiput);
     assertTrue(failedPuts == null);
 
     // verify that the Get returns the correct result
@@ -129,7 +146,7 @@ public class TestHTableMultiplexer {
       do {
         assertTrue(nbTry++ < 50);
         Thread.sleep(100);
-        r = ht.get(get);
+        r = htable1.get(get);
       } while (r == null || r.getValue(FAMILY, QUALIFIER) == null ||
           Bytes.compareTo(VALUE2, r.getValue(FAMILY, QUALIFIER)) != 0);
     }


Mime
View raw message