hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nkey...@apache.org
Subject svn commit: r1533604 - in /hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client: AsyncProcess.java HTable.java
Date Fri, 18 Oct 2013 19:34:19 GMT
Author: nkeywal
Date: Fri Oct 18 19:34:18 2013
New Revision: 1533604

URL: http://svn.apache.org/r1533604
Log:
HBASE-9768 Two issues in AsyncProcess

Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1533604&r1=1533603&r2=1533604&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
(original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
Fri Oct 18 19:34:18 2013
@@ -265,7 +265,18 @@ class AsyncProcess<CResult> {
       new HashMap<HRegionLocation, MultiAction<Row>>();
     List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
 
+    long currentTaskCnt = tasksDone.get();
+    boolean alreadyLooped = false;
+
     do {
+      if (alreadyLooped){
+        // if, for whatever reason, we looped, we want to be sure that something has changed.
+        waitForNextTaskDone(currentTaskCnt);
+        currentTaskCnt = tasksDone.get();
+      } else {
+        alreadyLooped = true;
+      }
+
       // Wait until there is at least one slot for a new task.
       waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);
 
@@ -280,8 +291,9 @@ class AsyncProcess<CResult> {
         Row r = it.next();
         HRegionLocation loc = findDestLocation(r, 1, posInList);
 
-        if (loc != null && canTakeOperation(loc, regionIncluded, serverIncluded))
{
-          // loc is null if there is an error such as meta not available.
+        if (loc == null) { // loc is null if there is an error such as meta not available.
+          it.remove();
+        } else if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
           Action<Row> action = new Action<Row>(r, ++posInList);
           retainedActions.add(action);
           addAction(loc, action, actionsByServer);
@@ -644,6 +656,7 @@ class AsyncProcess<CResult> {
     for (Map.Entry<byte[], List<Pair<Integer, Object>>> resultsForRS :
         responses.getResults().entrySet()) {
 
+      boolean regionFailureRegistered = false;
       for (Pair<Integer, Object> regionResult : resultsForRS.getValue()) {
         Object result = regionResult.getSecond();
 
@@ -652,8 +665,9 @@ class AsyncProcess<CResult> {
           throwable = (Throwable) result;
           Action<Row> correspondingAction = initialActions.get(regionResult.getFirst());
           Row row = correspondingAction.getAction();
-
-          if (failureCount++ == 0) { // We're doing this once per location.
+          failureCount++;
+          if (!regionFailureRegistered) { // We're doing this once per location.
+            regionFailureRegistered= true;
             hConnection.updateCachedLocations(this.tableName, row.getRow(), result, location);
             if (errorsByServer != null) {
               errorsByServer.reportServerError(location);

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1533604&r1=1533603&r2=1533604&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java Fri
Oct 18 19:34:18 2013
@@ -58,8 +58,6 @@ import org.apache.hadoop.hbase.ipc.Paylo
 import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
@@ -889,6 +887,7 @@ public class HTable implements HTableInt
    */
   private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException
{
     if (ap.hasError()){
+      writeAsyncBuffer.add(put);
       backgroundFlushCommits(true);
     }
 
@@ -907,25 +906,29 @@ public class HTable implements HTableInt
    * Send the operations in the buffer to the servers. Does not wait for the server's answer.
    * If the is an error (max retried reach from a previous flush or bad operation), it tries
to
    * send all operations in the buffer and sends an exception.
+   * @param synchronous - if true, sends all the writes and wait for all of them to finish
before
+   *                     returning.
    */
   private void backgroundFlushCommits(boolean synchronous) throws
       InterruptedIOException, RetriesExhaustedWithDetailsException {
 
     try {
-      // If there is an error on the operations in progress, we don't add new operations.
-      if (writeAsyncBuffer.size() > 0 && !ap.hasError()) {
+      do {
         ap.submit(writeAsyncBuffer, true);
-      }
+      } while (synchronous && !writeAsyncBuffer.isEmpty());
 
-      if (synchronous || ap.hasError()) {
-        if (ap.hasError() && LOG.isDebugEnabled()) {
-          LOG.debug(tableName + ": One or more of the operations have failed -" +
-              " waiting for all operation in progress to finish (successfully or not)");
-        }
+      if (synchronous) {
         ap.waitUntilDone();
       }
 
       if (ap.hasError()) {
+        LOG.debug(tableName + ": One or more of the operations have failed -" +
+            " waiting for all operation in progress to finish (successfully or not)");
+        while (!writeAsyncBuffer.isEmpty()) {
+          ap.submit(writeAsyncBuffer, true);
+        }
+        ap.waitUntilDone();
+
         if (!clearBufferOnFail) {
           // if clearBufferOnFailed is not set, we're supposed to keep the failed operation
in the
           //  write buffer. This is a questionable feature kept here for backward compatibility
@@ -1186,12 +1189,9 @@ public class HTable implements HTableInt
    */
   @Override
   public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException
{
-    // We're looping, as if one region is overloaded we keep its operations in the buffer.
     // As we can have an operation in progress even if the buffer is empty, we call
     //  backgroundFlushCommits at least one time.
-    do {
-      backgroundFlushCommits(true);
-    } while (!writeAsyncBuffer.isEmpty());
+    backgroundFlushCommits(true);
   }
 
   /**



Mime
View raw message