hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nkey...@apache.org
Subject svn commit: r1528340 - in /hbase/trunk/hbase-client/src: main/java/org/apache/hadoop/hbase/client/AsyncProcess.java test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
Date Wed, 02 Oct 2013 08:08:37 GMT
Author: nkeywal
Date: Wed Oct  2 08:08:36 2013
New Revision: 1528340

URL: http://svn.apache.org/r1528340
Log:
HBASE-9676 AsyncProcess can create more tasks than hbase.client.max.total.tasks

Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
    hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1528340&r1=1528339&r2=1528340&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
(original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
Wed Oct  2 08:08:36 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.DoNotRetr
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
@@ -87,6 +88,7 @@ import org.cloudera.htrace.Trace;
  */
 class AsyncProcess<CResult> {
   private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
+  private final static int START_LOG_ERRORS_CNT = 4;
   protected final HConnection hConnection;
   protected final TableName tableName;
   protected final ExecutorService pool;
@@ -98,8 +100,26 @@ class AsyncProcess<CResult> {
   protected final AtomicLong tasksDone = new AtomicLong(0);
   protected final ConcurrentMap<String, AtomicInteger> taskCounterPerRegion =
       new ConcurrentHashMap<String, AtomicInteger>();
+  protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
+      new ConcurrentHashMap<ServerName, AtomicInteger>();
+
+  /**
+   * The number of tasks simultaneously executed on the cluster.
+   */
   protected final int maxTotalConcurrentTasks;
+
+  /**
+   * The number of tasks we run in parallel on a single region.
+   * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't
start
+   * a set of operations on a region before the previous one is done. As well, this limits
+   * the pressure we put on the region server.
+   */
   protected final int maxConcurrentTasksPerRegion;
+
+  /**
+   * The number of task simultaneously executed on a single region server.
+   */
+  protected final int maxConcurrentTasksPerServer;
   protected final long pause;
   protected int numTries;
   protected final boolean useServerTrackerForRetries;
@@ -191,13 +211,22 @@ class AsyncProcess<CResult> {
     this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
 
-    this.maxTotalConcurrentTasks = conf.getInt("hbase.client.max.total.tasks", 200);
-
-    // With one, we ensure that the ordering of the queries is respected: we don't start
-    //  a set of operations on a region before the previous one is done. As well, this limits
-    //  the pressure we put on the region server.
+    this.maxTotalConcurrentTasks = conf.getInt("hbase.client.max.total.tasks", 100);
+    this.maxConcurrentTasksPerServer = conf.getInt("hbase.client.max.perserver.tasks", 5);
     this.maxConcurrentTasksPerRegion = conf.getInt("hbase.client.max.perregion.tasks", 1);
 
+    if (this.maxTotalConcurrentTasks <= 0) {
+      throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
+    }
+    if (this.maxConcurrentTasksPerServer <= 0) {
+      throw new IllegalArgumentException("maxConcurrentTasksPerServer=" +
+          maxConcurrentTasksPerServer);
+    }
+    if (this.maxConcurrentTasksPerRegion <= 0) {
+      throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
+          maxConcurrentTasksPerRegion);
+    }
+
     this.useServerTrackerForRetries =
         conf.getBoolean(HConnectionManager.RETRIES_BY_SERVER_KEY, true);
 
@@ -235,15 +264,22 @@ class AsyncProcess<CResult> {
     List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
 
     do {
+      // Wait until there is at least one slot for a new task.
+      waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);
+
+      // Remember the previous decisions about regions or region servers we put in the
+      //  final multi.
       Map<String, Boolean> regionIncluded = new HashMap<String, Boolean>();
-      long currentTaskNumber = waitForMaximumCurrentTasks(maxTotalConcurrentTasks);
+      Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
+
       int posInList = -1;
       Iterator<? extends Row> it = rows.iterator();
       while (it.hasNext()) {
         Row r = it.next();
-        HRegionLocation loc = findDestLocation(r, 1, posInList, false, regionIncluded);
+        HRegionLocation loc = findDestLocation(r, 1, posInList);
 
-        if (loc != null) {   // loc is null if the dest is too busy or there is an error
+        if (loc != null && canTakeOperation(loc, regionIncluded, serverIncluded))
{
+          // loc is null if there is an error such as meta not available.
           Action<Row> action = new Action<Row>(r, ++posInList);
           retainedActions.add(action);
           addAction(loc, action, actionsByServer);
@@ -251,10 +287,6 @@ class AsyncProcess<CResult> {
         }
       }
 
-      if (retainedActions.isEmpty() && atLeastOne && !hasError()) {
-        waitForNextTaskDone(currentTaskNumber);
-      }
-
     } while (retainedActions.isEmpty() && atLeastOne && !hasError());
 
     HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
@@ -281,18 +313,14 @@ class AsyncProcess<CResult> {
   }
 
   /**
-   * Find the destination, if this destination is not considered as busy.
+   * Find the destination.
    *
    * @param row          the row
    * @param numAttempt   the num attempt
    * @param posInList    the position in the list
-   * @param force        if we must submit whatever the server load
-   * @param regionStatus the
-   * @return null if we should not submit, the destination otherwise.
-   */
-  private HRegionLocation findDestLocation(Row row, int numAttempt,
-                                           int posInList, boolean force,
-                                           Map<String, Boolean> regionStatus) {
+   * @return the destination. Null if we couldn't find it.
+   */
+  private HRegionLocation findDestLocation(Row row, int numAttempt, int posInList) {
     if (row == null){
       throw new IllegalArgumentException("row cannot be null");
     }
@@ -316,30 +344,75 @@ class AsyncProcess<CResult> {
       return null;
     }
 
-    if (force) {
-      return loc;
-    }
-
-    String regionName = loc.getRegionInfo().getEncodedName();
-    Boolean addIt = regionStatus.get(regionName);
-    if (addIt == null) {
-      addIt = canTakeNewOperations(regionName);
-      regionStatus.put(regionName, addIt);
-    }
-
-    return addIt ? loc : null;
+    return loc;
   }
 
-
   /**
-   * Check if we should send new operations to this region.
+   * Check if we should send new operations to this region or region server.
+   * We're taking into account the past decision; if we have already accepted
+   * operation on a given region, we accept all operations for this region.
+   *
    *
-   * @param encodedRegionName region name
+   * @param loc; the region and the server name we want to use.
    * @return true if this region is considered as busy.
    */
-  protected boolean canTakeNewOperations(String encodedRegionName) {
-    AtomicInteger ct = taskCounterPerRegion.get(encodedRegionName);
-    return ct == null || ct.get() < maxConcurrentTasksPerRegion;
+  protected boolean canTakeOperation(HRegionLocation loc,
+                                     Map<String, Boolean> regionsIncluded,
+                                     Map<ServerName, Boolean> serversIncluded) {
+    String encodedRegionName = loc.getRegionInfo().getEncodedName();
+    Boolean regionPrevious = regionsIncluded.get(encodedRegionName);
+
+    if (regionPrevious != null) {
+      // We already know what to do with this region.
+      return regionPrevious;
+    }
+
+    Boolean serverPrevious = serversIncluded.get(loc.getServerName());
+    if (Boolean.FALSE.equals(serverPrevious)) {
+      // It's a new region, on a region server that we have already excluded.
+      regionsIncluded.put(encodedRegionName, Boolean.FALSE);
+      return false;
+    }
+
+    AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName);
+    if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
+      // Too many tasks on this region already.
+      regionsIncluded.put(encodedRegionName, Boolean.FALSE);
+      return false;
+    }
+
+    if (serverPrevious == null) {
+      // The region is ok, but we need to decide for this region server.
+      int newServers = 0; // number of servers we're going to contact so far
+      for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {
+        if (kv.getValue()) {
+          newServers++;
+        }
+      }
+
+      // Do we have too many total tasks already?
+      boolean ok = (newServers + getCurrentTasksCount()) < maxTotalConcurrentTasks;
+
+      if (ok) {
+        // If the total is fine, is it ok for this individual server?
+        AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
+        ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer);
+      }
+
+      if (!ok) {
+        regionsIncluded.put(encodedRegionName, Boolean.FALSE);
+        serversIncluded.put(loc.getServerName(), Boolean.FALSE);
+        return false;
+      }
+
+      serversIncluded.put(loc.getServerName(), Boolean.TRUE);
+    } else {
+      assert serverPrevious.equals(Boolean.TRUE);
+    }
+
+    regionsIncluded.put(encodedRegionName, Boolean.TRUE);
+
+    return true;
   }
 
   /**
@@ -359,35 +432,27 @@ class AsyncProcess<CResult> {
       actions.add(action);
     }
     HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
-    submit(actions, actions, 1, true, errorsByServer);
+    submit(actions, actions, 1, errorsByServer);
   }
 
 
   /**
    * Group a list of actions per region servers, and send them. The created MultiActions
are
-   * added to the inProgress list.
+   * added to the inProgress list. Does not take into account the region/server load.
    *
    * @param initialActions - the full list of the actions in progress
    * @param currentActions - the list of row to submit
    * @param numAttempt - the current numAttempt (first attempt is 1)
-   * @param force - true if we submit the rowList without taking into account the server
load
    */
   private void submit(List<Action<Row>> initialActions,
-                      List<Action<Row>> currentActions, int numAttempt, boolean
force,
+                      List<Action<Row>> currentActions, int numAttempt,
                       final HConnectionManager.ServerErrorTracker errorsByServer) {
     // group per location => regions server
     final Map<HRegionLocation, MultiAction<Row>> actionsByServer =
         new HashMap<HRegionLocation, MultiAction<Row>>();
 
-    // We have the same policy for a single region per call to submit: we don't want
-    //  to send half of the actions because the status changed in the middle. So we keep
the
-    //  status
-    Map<String, Boolean> regionIncluded = new HashMap<String, Boolean>();
-
     for (Action<Row> action : currentActions) {
-      HRegionLocation loc = findDestLocation(
-          action.getAction(), 1, action.getOriginalIndex(), force, regionIncluded);
-
+      HRegionLocation loc = findDestLocation(action.getAction(), 1, action.getOriginalIndex());
       if (loc != null) {
         addAction(loc, action, actionsByServer);
       }
@@ -414,7 +479,7 @@ class AsyncProcess<CResult> {
     for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet())
{
       final HRegionLocation loc = e.getKey();
       final MultiAction<Row> multi = e.getValue();
-      incTaskCounters(multi.getRegions());
+      incTaskCounters(multi.getRegions(), loc.getServerName());
 
       Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() {
         @Override
@@ -433,7 +498,7 @@ class AsyncProcess<CResult> {
 
             receiveMultiAction(initialActions, multi, loc, res, numAttempt, errorsByServer);
           } finally {
-            decTaskCounters(multi.getRegions());
+            decTaskCounters(multi.getRegions(), loc.getServerName());
           }
         }
       });
@@ -443,7 +508,7 @@ class AsyncProcess<CResult> {
       } catch (RejectedExecutionException ree) {
         // This should never happen. But as the pool is provided by the end user, let's secure
         //  this a little.
-        decTaskCounters(multi.getRegions());
+        decTaskCounters(multi.getRegions(), loc.getServerName());
         LOG.warn("The task was rejected by the pool. This is unexpected." +
             " Server is " + loc.getServerName(), ree);
         // We're likely to fail again, but this will increment the attempt counter, so it
will
@@ -463,11 +528,10 @@ class AsyncProcess<CResult> {
 
   /**
    * For tests.
-   * @param callable
+   * @param callable: used in tests.
    * @return Returns a caller.
    */
   protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row>
callable) {
-    // callable is unused.
     return rpcCallerFactory.<MultiResponse> newCaller();
   }
 
@@ -543,7 +607,7 @@ class AsyncProcess<CResult> {
       LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for all " +
         initialActions.size() + "ops, NOT resubmitting, " + location.getServerName());
     } else {
-      submit(initialActions, toReplay, numAttempt, true, errorsByServer);
+      submit(initialActions, toReplay, numAttempt, errorsByServer);
     }
   }
 
@@ -619,7 +683,7 @@ class AsyncProcess<CResult> {
       long backOffTime = (errorsByServer != null ?
           errorsByServer.calculateBackoffTime(location, pause) :
           ConnectionUtils.getPauseTime(pause, numAttempt));
-      if (numAttempt > 3 && LOG.isDebugEnabled()) {
+      if (numAttempt > START_LOG_ERRORS_CNT && LOG.isDebugEnabled()) {
         // We use this value to have some logs when we have multiple failures, but not too
many
         //  logs, as errors are to be expected when a region moves, splits and so on
         LOG.debug("Attempt #" + numAttempt + "/" + numTries + " failed " + failureCount +
@@ -636,10 +700,16 @@ class AsyncProcess<CResult> {
         return;
       }
 
-      submit(initialActions, toReplay, numAttempt + 1, true, errorsByServer);
-    } else if (failureCount != 0) {
-      LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for " + failureCount
+
-          " ops on " + location.getServerName() + " NOT resubmitting." + location);
+      submit(initialActions, toReplay, numAttempt + 1, errorsByServer);
+    } else {
+      if (failureCount != 0) {
+        // We have a failure but nothing to retry. We're done, it's a final failure..
+        LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for " + failureCount
+
+            " ops on " + location.getServerName() + " NOT resubmitting." + location);
+      } else if (numAttempt > START_LOG_ERRORS_CNT + 1 && LOG.isDebugEnabled())
{
+        // The operation was successful, but needed several attempts. Let's log this.
+        LOG.debug("Attempt #" + numAttempt + "/" + numTries + " is finally successful.");
+      }
     }
   }
 
@@ -664,7 +734,7 @@ class AsyncProcess<CResult> {
   /**
    * Wait until the async does not have more than max tasks in progress.
    */
-  private long waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
+  private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
     long lastLog = EnvironmentEdgeManager.currentTimeMillis();
     long currentTasksDone = this.tasksDone.get();
 
@@ -679,8 +749,10 @@ class AsyncProcess<CResult> {
       waitForNextTaskDone(currentTasksDone);
       currentTasksDone = this.tasksDone.get();
     }
+  }
 
-    return currentTasksDone;
+  private long getCurrentTasksCount(){
+    return  tasksSent.get() - tasksDone.get();
   }
 
   /**
@@ -715,30 +787,39 @@ class AsyncProcess<CResult> {
   /**
    * increment the tasks counters for a given set of regions. MT safe.
    */
-  protected void incTaskCounters(Collection<byte[]> regions) {
+  protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
     tasksSent.incrementAndGet();
 
+    AtomicInteger serverCnt = taskCounterPerServer.get(sn);
+    if (serverCnt == null) {
+      taskCounterPerServer.putIfAbsent(sn, new AtomicInteger());
+      serverCnt = taskCounterPerServer.get(sn);
+    }
+    serverCnt.incrementAndGet();
+
     for (byte[] regBytes : regions) {
       String encodedRegionName = HRegionInfo.encodeRegionName(regBytes);
-      AtomicInteger counterPerServer = taskCounterPerRegion.get(encodedRegionName);
-      if (counterPerServer == null) {
+      AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName);
+      if (regionCnt == null) {
         taskCounterPerRegion.putIfAbsent(encodedRegionName, new AtomicInteger());
-        counterPerServer = taskCounterPerRegion.get(encodedRegionName);
+        regionCnt = taskCounterPerRegion.get(encodedRegionName);
       }
-      counterPerServer.incrementAndGet();
+      regionCnt.incrementAndGet();
     }
   }
 
   /**
-   * Decrements the counters for a given region
+   * Decrements the counters for a given region and the region server. MT Safe.
    */
-  protected void decTaskCounters(Collection<byte[]> regions) {
+  protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
     for (byte[] regBytes : regions) {
       String encodedRegionName = HRegionInfo.encodeRegionName(regBytes);
-      AtomicInteger counterPerServer = taskCounterPerRegion.get(encodedRegionName);
-      counterPerServer.decrementAndGet();
+      AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName);
+      regionCnt.decrementAndGet();
     }
 
+    taskCounterPerServer.get(sn).decrementAndGet();
+
     tasksDone.incrementAndGet();
     synchronized (tasksDone) {
       tasksDone.notifyAll();

Modified: hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java?rev=1528340&r1=1528339&r2=1528340&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
(original)
+++ hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
Wed Oct  2 08:08:36 2013
@@ -40,7 +40,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
@@ -55,6 +54,7 @@ public class TestAsyncProcess {
       TableName.valueOf("DUMMY_TABLE");
   private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
   private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
+  private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes();
   private static final byte[] FAILS = "FAILS".getBytes();
   private static final Configuration conf = new Configuration();
 
@@ -63,8 +63,11 @@ public class TestAsyncProcess {
   private static HRegionInfo hri1 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2);
   private static HRegionInfo hri2 =
       new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW);
+  private static HRegionInfo hri3 =
+      new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW);
   private static HRegionLocation loc1 = new HRegionLocation(hri1, sn);
   private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
+  private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
 
   private static final String success = "success";
   private static Exception failure = new Exception("failure");
@@ -180,10 +183,15 @@ public class TestAsyncProcess {
     @Override
     public HRegionLocation locateRegion(final TableName tableName,
                                         final byte[] row) {
-      Random rd = new Random(Bytes.toLong(row));
-      int pos = rd.nextInt(hrl.size());
-      usedRegions[pos] = true;
-      return hrl.get(pos);
+      int i = 0;
+      for (HRegionLocation hr:hrl){
+        if (Arrays.equals(row, hr.getRegionInfo().getStartKey())){
+          usedRegions[i] = true;
+          return hr;
+        }
+        i++;
+      }
+      return null;
     }
   }
 
@@ -193,7 +201,7 @@ public class TestAsyncProcess {
     AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
 
     List<Put> puts = new ArrayList<Put>();
-    puts.add(createPut(true, true));
+    puts.add(createPut(1, true));
 
     ap.submit(puts, false);
     Assert.assertTrue(puts.isEmpty());
@@ -206,7 +214,7 @@ public class TestAsyncProcess {
     AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
 
     List<Put> puts = new ArrayList<Put>();
-    puts.add(createPut(true, true));
+    puts.add(createPut(1, true));
 
     ap.submit(puts, false);
     Assert.assertTrue(puts.isEmpty());
@@ -223,13 +231,35 @@ public class TestAsyncProcess {
     AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
 
     List<Put> puts = new ArrayList<Put>();
-    puts.add(createPut(true, true));
+    puts.add(createPut(1, true));
 
-    ap.incTaskCounters(Arrays.asList(hri1.getRegionName()));
+    ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
     ap.submit(puts, false);
     Assert.assertEquals(puts.size(), 1);
 
-    ap.decTaskCounters(Arrays.asList(hri1.getRegionName()));
+    ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
+    ap.submit(puts, false);
+    Assert.assertTrue(puts.isEmpty());
+  }
+
+
+  @Test
+  public void testSubmitBusyRegionServer() throws Exception {
+    HConnection hc = createHConnection();
+    AsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, null, conf);
+
+    ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer));
+
+    List<Put> puts = new ArrayList<Put>();
+    puts.add(createPut(1, true));
+    puts.add(createPut(3, true)); // <== this one won't be taken, the rs is busy
+    puts.add(createPut(1, true)); // <== this one will make it, the region is already
in
+    puts.add(createPut(2, true)); // <== new region, but the rs is ok
+
+    ap.submit(puts, false);
+    Assert.assertEquals(1, puts.size());
+
+    ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1));
     ap.submit(puts, false);
     Assert.assertTrue(puts.isEmpty());
   }
@@ -241,7 +271,7 @@ public class TestAsyncProcess {
     AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
 
     List<Put> puts = new ArrayList<Put>();
-    Put p = createPut(true, false);
+    Put p = createPut(1, false);
     puts.add(p);
 
     ap.submit(puts, false);
@@ -318,7 +348,7 @@ public class TestAsyncProcess {
     };
 
     List<Put> puts = new ArrayList<Put>();
-    Put p = createPut(true, true);
+    Put p = createPut(1, true);
     puts.add(p);
 
     ap.submit(puts, false);
@@ -342,9 +372,9 @@ public class TestAsyncProcess {
     AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
 
     List<Put> puts = new ArrayList<Put>();
-    puts.add(createPut(true, false));
-    puts.add(createPut(true, true));
-    puts.add(createPut(true, true));
+    puts.add(createPut(1, false));
+    puts.add(createPut(1, true));
+    puts.add(createPut(1, true));
 
     ap.submit(puts, false);
     Assert.assertTrue(puts.isEmpty());
@@ -359,7 +389,7 @@ public class TestAsyncProcess {
     Assert.assertEquals(1, ap.getErrors().actions.size());
 
 
-    puts.add(createPut(true, true));
+    puts.add(createPut(1, true));
     ap.submit(puts, false);
     Assert.assertTrue(puts.isEmpty());
 
@@ -380,9 +410,9 @@ public class TestAsyncProcess {
     AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
 
     List<Put> puts = new ArrayList<Put>();
-    puts.add(createPut(true, false));
-    puts.add(createPut(true, true));
-    puts.add(createPut(true, true));
+    puts.add(createPut(1, false));
+    puts.add(createPut(1, true));
+    puts.add(createPut(1, true));
 
     ap.submit(puts, false);
     ap.waitUntilDone();
@@ -400,7 +430,7 @@ public class TestAsyncProcess {
     final AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
 
     for (int i = 0; i < 1000; i++) {
-      ap.incTaskCounters(Arrays.asList("dummy".getBytes()));
+      ap.incTaskCounters(Arrays.asList("dummy".getBytes()), sn);
     }
 
     final Thread myThread = Thread.currentThread();
@@ -413,7 +443,7 @@ public class TestAsyncProcess {
     };
 
     List<Put> puts = new ArrayList<Put>();
-    puts.add(createPut(true, true));
+    puts.add(createPut(1, true));
 
     t.start();
 
@@ -429,7 +459,7 @@ public class TestAsyncProcess {
       public void run() {
         Threads.sleep(sleepTime);
         while (ap.tasksDone.get() > 0) {
-          ap.decTaskCounters(Arrays.asList("dummy".getBytes()));
+          ap.decTaskCounters(Arrays.asList("dummy".getBytes()), sn);
         }
       }
     };
@@ -484,6 +514,11 @@ public class TestAsyncProcess {
         Mockito.eq(DUMMY_BYTES_2))).thenReturn(loc2);
 
     Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
+        Mockito.eq(DUMMY_BYTES_3), Mockito.anyBoolean())).thenReturn(loc2);
+    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
+        Mockito.eq(DUMMY_BYTES_3))).thenReturn(loc3);
+
+    Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
         Mockito.eq(FAILS), Mockito.anyBoolean())).thenReturn(loc2);
     Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
         Mockito.eq(FAILS))).thenReturn(loc2);
@@ -497,7 +532,7 @@ public class TestAsyncProcess {
     HConnection hc = createHConnection();
     ht.ap = new MyAsyncProcess<Object>(hc, null, conf);
 
-    Put put = createPut(true, true);
+    Put put = createPut(1, true);
 
     Assert.assertEquals(0, ht.getWriteBufferSize());
     ht.put(put);
@@ -516,7 +551,7 @@ public class TestAsyncProcess {
       ht.setWriteBufferSize(0L);
     }
 
-    Put put = createPut(true, false);
+    Put put = createPut(1, false);
 
     Assert.assertEquals(0L, ht.currentWriteBufferSize);
     try {
@@ -555,7 +590,7 @@ public class TestAsyncProcess {
     ht.setAutoFlush(false, true);
     ht.setWriteBufferSize(0);
 
-    Put p = createPut(true, false);
+    Put p = createPut(1, false);
     ht.put(p);
 
     ht.ap.waitUntilDone(); // Let's do all the retries.
@@ -565,7 +600,7 @@ public class TestAsyncProcess {
     // This said, it's not a very easy going behavior. For example, when we insert a list
of
     //  puts, we may raise an exception in the middle of the list. It's then up to the caller
to
     //  manage what was inserted, what was tried but failed, and what was not even tried.
-    p = createPut(true, true);
+    p = createPut(1, true);
     Assert.assertEquals(0, ht.writeAsyncBuffer.size());
     try {
       ht.put(p);
@@ -584,7 +619,7 @@ public class TestAsyncProcess {
     ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf);
     ht.setAutoFlush(false, false);
 
-    Put p = createPut(true, false);
+    Put p = createPut(1, false);
     ht.put(p);
     Assert.assertEquals(0, ht.writeAsyncBuffer.size());
     try {
@@ -606,13 +641,13 @@ public class TestAsyncProcess {
     ht.connection = new MyConnectionImpl();
 
     List<Put> puts = new ArrayList<Put>();
-    puts.add(createPut(true, true));
-    puts.add(createPut(true, true));
-    puts.add(createPut(true, true));
-    puts.add(createPut(true, true));
-    puts.add(createPut(true, false)); // <=== the bad apple, position 4
-    puts.add(createPut(true, true));
-    puts.add(createPut(true, false)); // <=== another bad apple, position 6
+    puts.add(createPut(1, true));
+    puts.add(createPut(1, true));
+    puts.add(createPut(1, true));
+    puts.add(createPut(1, true));
+    puts.add(createPut(1, false)); // <=== the bad apple, position 4
+    puts.add(createPut(1, true));
+    puts.add(createPut(1, false)); // <=== another bad apple, position 6
 
     Object[] res = new Object[puts.size()];
     try {
@@ -650,7 +685,7 @@ public class TestAsyncProcess {
     ht.ap.serverTrackerTimeout = 1;
 
 
-    Put p = createPut(true, false);
+    Put p = createPut(1, false);
     ht.setAutoFlush(false, false);
     ht.put(p);
 
@@ -679,14 +714,14 @@ public class TestAsyncProcess {
       HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
       hrls.add(hrl);
 
-      Get get = new Get(Bytes.toBytes(i * 10L + 5L));
+      Get get = new Get(Bytes.toBytes(i * 10L));
       gets.add(get);
     }
 
     HTable ht = new HTable();
     MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
     ht.connection = con;
-    ht.batch((List) gets);
+    ht.batch(gets);
 
     Assert.assertEquals(con.ap.nbActions.get(), NB_REGS);
     Assert.assertEquals(con.ap.nbMultiResponse.get(), 2); // 1 multi response per server
@@ -696,23 +731,31 @@ public class TestAsyncProcess {
     for (int i =0; i<NB_REGS; i++){
       if (con.usedRegions[i]) nbReg++;
     }
-    Assert.assertTrue("nbReg=" + nbReg, nbReg > NB_REGS / 10);
+    Assert.assertEquals("nbReg=" + nbReg, nbReg, NB_REGS);
   }
 
 
   /**
-   * @param reg1    if true, creates a put on region 1, region 2 otherwise
+   * @param regCnt  the region: 1 to 3.
    * @param success if true, the put will succeed.
    * @return a put
    */
-  private Put createPut(boolean reg1, boolean success) {
+  private Put createPut(int regCnt, boolean success) {
     Put p;
     if (!success) {
       p = new Put(FAILS);
-    } else if (reg1) {
-      p = new Put(DUMMY_BYTES_1);
-    } else {
-      p = new Put(DUMMY_BYTES_2);
+    } else switch (regCnt){
+      case 1 :
+        p = new Put(DUMMY_BYTES_1);
+        break;
+      case 2:
+        p = new Put(DUMMY_BYTES_2);
+        break;
+      case 3:
+        p = new Put(DUMMY_BYTES_3);
+        break;
+      default:
+        throw new IllegalArgumentException("unknown " + regCnt);
     }
 
     p.add(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);



Mime
View raw message