hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1365692 - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client: HConnection.java HConnectionManager.java HTableMultiplexer.java
Date Wed, 25 Jul 2012 18:33:18 GMT
Author: mbautin
Date: Wed Jul 25 18:33:18 2012
New Revision: 1365692

URL: http://svn.apache.org/viewvc?rev=1365692&view=rev
Log:
[HBASE-6452] get rid of processSingleMultiPut.

Author: aaiyer

Summary:
processSingleMultiPut is very similar to processBatchOfMultiPut. Instead
of having multiple copies of the same code, try to integrate this with
the other function.

Test Plan: run tests on MR

Reviewers: liyintang, pkhemani

Reviewed By: liyintang

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D526344

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1365692&r1=1365691&r2=1365692&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Wed
Jul 25 18:33:18 2012
@@ -312,11 +312,15 @@ public interface HConnection extends Clo
    * 
    * Also it will return the list of failed put among the MultiPut request or return null
if all
    * puts are sent to the HRegionServer successfully.
-   * @param mput The MultiPut request
-   * @return the list of failed put among the MultiPut request, otherwise return null 
+   * @param mputs The list of MultiPut requests
+   * @param tableName
+   * @param options The RPC options to be used while communicating with the HRegionServer
+   * @return the list of failed put among the requests, otherwise return null
    *         if all puts are sent to the HRegionServer successfully.
+   * @throws IOException
    */
-  public List<Put> processSingleMultiPut(MultiPut mput, HBaseRPCOptions options);
+  public List<Put> processListOfMultiPut(List<MultiPut> mputs,
+      final byte[] tableName, HBaseRPCOptions options) throws IOException;
   
   /**
    * Delete the cached location

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1365692&r1=1365691&r2=1365692&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
Wed Jul 25 18:33:18 2012
@@ -2017,14 +2017,11 @@ public class HConnectionManager {
      * @param tableName The table name for the put request
      * @throws IOException
      */
-    private void processBatchOfMultiPut(List<Put> list,
-        List<Put> failed, final byte[] tableName, HBaseRPCOptions options) throws IOException
{
-      // XXX error handling should mirror getRegionServerWithRetries()
-      Collections.sort(list);
+    private List<MultiPut> splitPutsIntoMultiPuts(List<Put> list,
+        final byte[] tableName, HBaseRPCOptions options) throws IOException {
       Map<HServerAddress, MultiPut> regionPuts =
           new HashMap<HServerAddress, MultiPut>();
-      // step 1:
-      //  break up into regionserver-sized chunks and build the data structs
+
       for ( Put put : list ) {
         byte [] row = put.getRow();
 
@@ -2040,30 +2037,29 @@ public class HConnectionManager {
         mput.add(regionName, put);
       }
 
-      // step 2:
-      //  make the requests
-      // Discard the map, just use a list now, makes error recovery easier.
-      List<MultiPut> multiPuts = new ArrayList<MultiPut>(regionPuts.values());
+      return new ArrayList<MultiPut>(regionPuts.values());
+    }
+
+    public List<Put> processListOfMultiPut(List<MultiPut> multiPuts,
+      final byte[] givenTableName, HBaseRPCOptions options) throws IOException {
+      List<Put> failed = null;
 
       List<Future<MultiPutResponse>> futures =
-          new ArrayList<Future<MultiPutResponse>>(regionPuts.size());
+          new ArrayList<Future<MultiPutResponse>>(multiPuts.size());
       boolean singleServer = (multiPuts.size() == 1);
       for ( MultiPut put : multiPuts ) {
         Callable<MultiPutResponse> callable = createPutCallable(put.address,
-            put, tableName, options);
+            put, options);
         Future<MultiPutResponse> task;
         if (singleServer) {
           task = new FutureTask<MultiPutResponse>(callable);
           ((FutureTask<MultiPutResponse>)task).run();
-        }
-        else {
+        } else {
             task = HTable.multiActionThreadPool.submit(callable);
         }
         futures.add(task);
       }
 
-      // step 3:
-      //  collect the failures and tries from step 1.
       for (int i = 0; i < futures.size(); i++ ) {
         Future<MultiPutResponse> future = futures.get(i);
         MultiPut request = multiPuts.get(i);
@@ -2091,57 +2087,28 @@ public class HConnectionManager {
             // failed
             LOG.debug("Failed all for region: " +
                 Bytes.toStringBinary(region) + ", removing from cache");
+
+            // HTableMultiplexer does not specify the tableName, as it is
+            // shared across different tables. In that case, let us try
+            // and derive the tableName from the regionName.
+            byte[] tableName = (givenTableName != null)? givenTableName
+                : HRegionInfo.parseRegionName(region)[0];
+
+
             deleteCachedLocation(tableName, lst.get(0).getRow(), request.address);
 
+            if (failed == null) failed = new ArrayList<Put>();
             failed.addAll(e.getValue());
           } else if (result != HConstants.MULTIPUT_SUCCESS) {
             // some failures
+            if (failed == null) failed = new ArrayList<Put>();
             failed.addAll(lst.subList(result, lst.size()));
             LOG.debug("Failed past " + result + " for region: " +
                 Bytes.toStringBinary(region) + ", removing from cache");
           }
         }
       }
-    }
-
-    /** {@inheritDoc} */
-    public List<Put> processSingleMultiPut(MultiPut mput, HBaseRPCOptions options)
{
-      if (mput == null || mput.address == null)
-        return null;
-      List<Put> failedPuts = null;
-
-      Future<MultiPutResponse> future = HTable.multiActionThreadPool.submit(
-          createPutCallable(mput.address, mput, null, options));
-
-      try {
-        MultiPutResponse resp = future.get();
-        // Process the response for each region
-        for (Map.Entry<byte[], List<Put>> e : mput.puts.entrySet()) {
-          Integer result = resp.getAnswer(e.getKey());
-          if (result == null) {
-            // Prepare all the failed puts for retry
-            if (failedPuts == null) {
-              failedPuts = new ArrayList<Put>();
-            }
-            failedPuts.addAll(e.getValue());
-
-          } else if (result != HConstants.MULTIPUT_SUCCESS) {
-            // Prepared the failed puts for retry
-            if (failedPuts == null) {
-              failedPuts = new ArrayList<Put>();
-            }
-            List<Put> lst = e.getValue();
-            // In case of a failure the return result is the index (non-inclusive) up
-            // to which the operations succeded.
-            failedPuts.addAll(lst.subList(result, lst.size()));
-          } 
-        }
-      } catch (Exception e) {
-        LOG.error("Failed all puts request to " + mput.address.getHostNameWithPort() +
-            " because of " + e);
-        return (List<Put>) mput.allPuts();
-      }
-      return failedPuts;
+      return failed;
     }
 
     /**
@@ -2169,11 +2136,12 @@ public class HConnectionManager {
 
       int tries;
       for ( tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) {
-        List<Put> failed = new ArrayList<Put>();
-        this.processBatchOfMultiPut(list, failed, tableName, options);
+        List<Put> failed;
+        List<MultiPut> multiPuts = this.splitPutsIntoMultiPuts(list, tableName, options);
+        failed = this.processListOfMultiPut(multiPuts, tableName, options);
 
         list.clear();
-        if (!failed.isEmpty()) {
+        if (failed != null && !failed.isEmpty()) {
           // retry the failed ones after sleep
           list.addAll(failed);
 
@@ -2207,12 +2175,12 @@ public class HConnectionManager {
 
     private Callable<MultiPutResponse> createPutCallable(
         final HServerAddress address, final MultiPut puts,
-        final byte [] tableName, final HBaseRPCOptions options) {
+        final HBaseRPCOptions options) {
       final HConnection connection = this;
       return new Callable<MultiPutResponse>() {
         public MultiPutResponse call() throws IOException {
           return getRegionServerWithoutRetries(
-              new ServerCallable<MultiPutResponse>(connection, tableName, null, options)
{
+              new ServerCallable<MultiPutResponse>(connection, null, null, options)
{
                 public MultiPutResponse call() throws IOException {
                   MultiPutResponse resp = server.multiPut(puts);
                   resp.request = puts;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java?rev=1365692&r1=1365691&r2=1365692&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
Wed Jul 25 18:33:18 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -345,8 +346,6 @@ public class HTableMultiplexer {
       Put failedPut = failedPutStatus.getPut();
       // The currentPut is failed. So get the table name for the currentPut.
       byte[] tableName = failedPutStatus.getRegionInfo().getTableDesc().getName();
-      // Clear the cached location for the failed puts
-      this.connection.deleteCachedLocation(tableName, failedPut.getRow(), oldLoc);
       // Decrease the retry count
       int retryCount = failedPutStatus.getRetryCount() - 1;
       
@@ -404,7 +403,7 @@ public class HTableMultiplexer {
             }
             
             // Process this multiput request
-            List<Put> failed = connection.processSingleMultiPut(mput, options);
+            List<Put> failed = connection.processListOfMultiPut(Arrays.asList(mput),
null, options);
             if (failed != null) {
               if (failed.size() == processingList.size()) {
                 // All the puts for this region server are failed. Going to retry it later



Mime
View raw message