hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nkey...@apache.org
Subject svn commit: r1525643 - in /hbase/trunk/hbase-client/src: main/java/org/apache/hadoop/hbase/client/AsyncProcess.java test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
Date Mon, 23 Sep 2013 16:50:13 GMT
Author: nkeywal
Date: Mon Sep 23 16:50:12 2013
New Revision: 1525643

URL: http://svn.apache.org/r1525643
Log:
HBASE-9609  AsyncProcess doesn't incrase all the counters when trying to limit the per region
flow.

Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
    hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1525643&r1=1525642&r2=1525643&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
(original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
Mon Sep 23 16:50:12 2013
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -41,6 +42,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -408,14 +410,11 @@ class AsyncProcess<CResult> {
                               Map<HRegionLocation, MultiAction<Row>> actionsByServer,
                               final int numAttempt,
                               final HConnectionManager.ServerErrorTracker errorsByServer)
{
-
     // Send the queries and add them to the inProgress list
     for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet())
{
       final HRegionLocation loc = e.getKey();
       final MultiAction<Row> multi = e.getValue();
-      final String regionName = loc.getRegionInfo().getEncodedName();
-
-      incTaskCounters(regionName);
+      incTaskCounters(multi.getRegions());
 
       Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() {
         @Override
@@ -426,14 +425,15 @@ class AsyncProcess<CResult> {
             try {
               res = createCaller(callable).callWithoutRetries(callable);
             } catch (IOException e) {
-              LOG.warn("The call to the RS failed, we don't know where we stand, " + loc,
e);
+              LOG.warn("The call to the region server failed, we don't know where we stand,
" +
+                  loc.getServerName(), e);
               resubmitAll(initialActions, multi, loc, numAttempt + 1, e, errorsByServer);
               return;
             }
 
             receiveMultiAction(initialActions, multi, loc, res, numAttempt, errorsByServer);
           } finally {
-            decTaskCounters(regionName);
+            decTaskCounters(multi.getRegions());
           }
         }
       });
@@ -443,8 +443,9 @@ class AsyncProcess<CResult> {
       } catch (RejectedExecutionException ree) {
         // This should never happen. But as the pool is provided by the end user, let's secure
         //  this a little.
-        decTaskCounters(regionName);
-        LOG.warn("The task was rejected by the pool. This is unexpected. " + loc, ree);
+        decTaskCounters(multi.getRegions());
+        LOG.warn("The task was rejected by the pool. This is unexpected." +
+            " Server is " + loc.getServerName(), ree);
         // We're likely to fail again, but this will increment the attempt counter, so it
will
         //  finish.
         resubmitAll(initialActions, multi, loc, numAttempt + 1, ree, errorsByServer);
@@ -540,7 +541,7 @@ class AsyncProcess<CResult> {
 
     if (toReplay.isEmpty()) {
       LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for all " +
-        initialActions.size() + "ops, NOT resubmitting, " + location);
+        initialActions.size() + "ops, NOT resubmitting, " + location.getServerName());
     } else {
       submit(initialActions, toReplay, numAttempt, true, errorsByServer);
     }
@@ -712,25 +713,31 @@ class AsyncProcess<CResult> {
   }
 
   /**
-   * incrementer the tasks counters for a given region. MT safe.
+   * increment the tasks counters for a given set of regions. MT safe.
    */
-  protected void incTaskCounters(String encodedRegionName) {
+  protected void incTaskCounters(Collection<byte[]> regions) {
     tasksSent.incrementAndGet();
 
-    AtomicInteger counterPerServer = taskCounterPerRegion.get(encodedRegionName);
-    if (counterPerServer == null) {
-      taskCounterPerRegion.putIfAbsent(encodedRegionName, new AtomicInteger());
-      counterPerServer = taskCounterPerRegion.get(encodedRegionName);
+    for (byte[] regBytes : regions) {
+      String encodedRegionName = HRegionInfo.encodeRegionName(regBytes);
+      AtomicInteger counterPerServer = taskCounterPerRegion.get(encodedRegionName);
+      if (counterPerServer == null) {
+        taskCounterPerRegion.putIfAbsent(encodedRegionName, new AtomicInteger());
+        counterPerServer = taskCounterPerRegion.get(encodedRegionName);
+      }
+      counterPerServer.incrementAndGet();
     }
-    counterPerServer.incrementAndGet();
   }
 
   /**
    * Decrements the counters for a given region
    */
-  protected void decTaskCounters(String encodedRegionName) {
-    AtomicInteger counterPerServer = taskCounterPerRegion.get(encodedRegionName);
-    counterPerServer.decrementAndGet();
+  protected void decTaskCounters(Collection<byte[]> regions) {
+    for (byte[] regBytes : regions) {
+      String encodedRegionName = HRegionInfo.encodeRegionName(regBytes);
+      AtomicInteger counterPerServer = taskCounterPerRegion.get(encodedRegionName);
+      counterPerServer.decrementAndGet();
+    }
 
     tasksDone.incrementAndGet();
     synchronized (tasksDone) {

Modified: hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java?rev=1525643&r1=1525642&r2=1525643&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
(original)
+++ hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
Mon Sep 23 16:50:12 2013
@@ -174,11 +174,11 @@ public class TestAsyncProcess {
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(true, true));
 
-    ap.incTaskCounters(hri1.getEncodedName());
+    ap.incTaskCounters(Arrays.asList(hri1.getRegionName()));
     ap.submit(puts, false);
     Assert.assertEquals(puts.size(), 1);
 
-    ap.decTaskCounters(hri1.getEncodedName());
+    ap.decTaskCounters(Arrays.asList(hri1.getRegionName()));
     ap.submit(puts, false);
     Assert.assertTrue(puts.isEmpty());
   }
@@ -349,7 +349,7 @@ public class TestAsyncProcess {
     final AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
 
     for (int i = 0; i < 1000; i++) {
-      ap.incTaskCounters("dummy");
+      ap.incTaskCounters(Arrays.asList("dummy".getBytes()));
     }
 
     final Thread myThread = Thread.currentThread();
@@ -378,7 +378,7 @@ public class TestAsyncProcess {
       public void run() {
         Threads.sleep(sleepTime);
         while (ap.tasksDone.get() > 0) {
-          ap.decTaskCounters("dummy");
+          ap.decTaskCounters(Arrays.asList("dummy".getBytes()));
         }
       }
     };



Mime
View raw message