hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1338352 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/regionserver/HRegion.java main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Date Mon, 14 May 2012 19:19:36 GMT
Author: mbautin
Date: Mon May 14 19:19:35 2012
New Revision: 1338352

URL: http://svn.apache.org/viewvc?rev=1338352&view=rev
Log:
[fb89] [HBASE-5941] Allow multiDelete operations to grab locks aggressively

Author: aaiyer

Test Plan: mvn test on mr

Reviewers: kannan, mbautin, liyintang

Reviewed By: kannan

CC: HBase Diffs Facebook Group

Differential Revision: https://reviews.facebook.net/D3123

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1338352&r1=1338351&r2=1338352&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Mon May 14 19:19:35 2012
@@ -1649,7 +1649,7 @@ public class HRegion implements HeapSize
   /*
    * @param delete The passed delete is modified by this method. WARNING!
    */
-  private void prepareDelete(Delete delete) throws IOException {
+  private void prepareDeleteFamilyMap(Delete delete) throws IOException {
     // Check to see if this is a deleteRow insert
     if(delete.getFamilyMap().isEmpty()){
       for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){
@@ -1707,7 +1707,7 @@ public class HRegion implements HeapSize
       lid = getLock(lockid, row, true);
 
       // All edits for the given row (across all column families) must happen atomically.
-      prepareDelete(delete);
+      prepareDeleteFamilyMap(delete);
       delete(delete.getFamilyMap(), writeToWAL);
 
     } finally {
@@ -1919,23 +1919,25 @@ public class HRegion implements HeapSize
    */
   public OperationStatusCode[] put(Put[] puts) throws IOException {
     @SuppressWarnings("unchecked")
-    Pair<Put, Integer> putsAndLocks[] = new Pair[puts.length];
+    Pair<Mutation, Integer> putsAndLocks[] = new Pair[puts.length];
 
     for (int i = 0; i < puts.length; i++) {
-      putsAndLocks[i] = new Pair<Put, Integer>(puts[i], null);
+      putsAndLocks[i] = new Pair<Mutation, Integer>(puts[i], null);
     }
-    return put(putsAndLocks);
+    return batchMutateWithLocks(putsAndLocks, "multiput_");
   }
 
   /**
    * Perform a batch of puts.
    * @param putsAndLocks the list of puts paired with their requested lock IDs.
+   * @param methodName "multiput_/multidelete_" to update metrics correctly.
    * @throws IOException
    */
-  public OperationStatusCode[] put(Pair<Put, Integer>[] putsAndLocks) throws IOException
{
+  public OperationStatusCode[] batchMutateWithLocks(Pair<Mutation, Integer>[] putsAndLocks,

+      String methodName) throws IOException {
     this.writeRequests.incrTotalRequstCount();
-    BatchOperationInProgress<Pair<Put, Integer>> batchOp =
-      new BatchOperationInProgress<Pair<Put,Integer>>(putsAndLocks);
+    BatchOperationInProgress<Pair<Mutation, Integer>> batchOp =
+      new BatchOperationInProgress<Pair<Mutation,Integer>>(putsAndLocks);
 
     while (!batchOp.isDone()) {
       checkReadOnly();
@@ -1944,7 +1946,7 @@ public class HRegion implements HeapSize
       long newSize;
       splitsAndClosesLock.readLock().lock();
       try {
-        long addedSize = doMiniBatchPut(batchOp);
+        long addedSize = doMiniBatchOp(batchOp, methodName);
         newSize = this.incMemoryUsage(addedSize);
       } finally {
         splitsAndClosesLock.readLock().unlock();
@@ -1957,7 +1959,8 @@ public class HRegion implements HeapSize
     return batchOp.retCodes;
   }
 
-  private long doMiniBatchPut(BatchOperationInProgress<Pair<Put, Integer>> batchOp)
throws IOException {
+  private long doMiniBatchOp(BatchOperationInProgress<Pair<Mutation, Integer>>
batchOp, 
+      String methodNameForMetricsUpdate) throws IOException {
     String signature = null;
     // variable to note if all Put items are for the same CF -- metrics related
     boolean isSignatureClear = true;
@@ -1979,25 +1982,27 @@ public class HRegion implements HeapSize
       // ----------------------------------
       int numReadyToWrite = 0;
       while (lastIndexExclusive < batchOp.operations.length) {
-        Pair<Put, Integer> nextPair = batchOp.operations[lastIndexExclusive];
-        Put put = nextPair.getFirst();
+        Pair<Mutation, Integer> nextPair = batchOp.operations[lastIndexExclusive];
+        Mutation op = nextPair.getFirst();
         Integer providedLockId = nextPair.getSecond();
 
         // Check the families in the put. If bad, skip this one.
-        try {
-          checkFamilies(put.getFamilyMap().keySet());
-          checkTimestamps(put, now);
-        } catch (DoNotRetryIOException dnrioe) {
-          LOG.warn("Sanity check error in batch put", dnrioe);
-          batchOp.retCodes[lastIndexExclusive] = OperationStatusCode.SANITY_CHECK_FAILURE;
-          lastIndexExclusive++;
-          continue;
+        if (op instanceof Put) {
+          try {
+            checkFamilies(op.getFamilyMap().keySet());
+            checkTimestamps(op, now);
+          } catch (DoNotRetryIOException dnrioe) {
+            LOG.warn("Sanity check error in batch processing", dnrioe);
+            batchOp.retCodes[lastIndexExclusive] = OperationStatusCode.SANITY_CHECK_FAILURE;
+            lastIndexExclusive++;
+            continue;
+          }
         }
 
         // If we haven't got any rows in our batch, we should block to
         // get the next one.
         boolean shouldBlock = numReadyToWrite == 0;
-        Integer acquiredLockId = getLock(providedLockId, put.getRow(), shouldBlock);
+        Integer acquiredLockId = getLock(providedLockId, op.getRow(), shouldBlock);
         if (acquiredLockId == null) {
           // We failed to grab another lock
           assert !shouldBlock : "Should never fail to get lock when blocking";
@@ -2014,13 +2019,13 @@ public class HRegion implements HeapSize
         // all else, designate failure signature and mark as unclear
         if (null == signature) {
           signature = SchemaMetrics.generateSchemaMetricsPrefix(
-              this.getTableDesc().getNameAsString(), put.getFamilyMap()
+              this.getTableDesc().getNameAsString(), op.getFamilyMap()
               .keySet());
         } else {
           if (isSignatureClear) {
             if (!signature.equals(SchemaMetrics.generateSchemaMetricsPrefix(
                 this.getTableDesc().getNameAsString(),
-                put.getFamilyMap().keySet()))) {
+                op.getFamilyMap().keySet()))) {
               isSignatureClear = false;
               signature = SchemaMetrics.CF_UNKNOWN_PREFIX;
             }
@@ -2030,7 +2035,6 @@ public class HRegion implements HeapSize
       // We've now grabbed as many puts off the list as we can
       assert numReadyToWrite > 0;
 
-
       this.updatesLock.readLock().lock();
       locked = true;
 
@@ -2038,9 +2042,17 @@ public class HRegion implements HeapSize
       // STEP 2. Update any LATEST_TIMESTAMP timestamps
       // ----------------------------------
       for (int i = firstIndex; i < lastIndexExclusive; i++) {
-        updateKVTimestamps(
-            batchOp.operations[i].getFirst().getFamilyMap().values(),
-            byteNow);
+        Mutation op = batchOp.operations[i].getFirst();
+        
+        if (op instanceof Put) {
+          updateKVTimestamps(
+              op.getFamilyMap().values(),
+              byteNow);
+        }
+        else if (op instanceof Delete) {
+          prepareDeleteFamilyMap((Delete)op);
+          prepareDeleteTimestamps(op.getFamilyMap(), byteNow);
+        }
       }
 
       // ------------------------------------
@@ -2051,9 +2063,9 @@ public class HRegion implements HeapSize
         // Skip puts that were determined to be invalid during preprocessing
         if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
 
-        Put p = batchOp.operations[i].getFirst();
-        if (!p.getWriteToWAL()) continue;
-        addFamilyMapToWALEdit(p.getFamilyMap(), walEdit);
+        Mutation op = batchOp.operations[i].getFirst();
+        if (!op.getWriteToWAL()) continue;
+        addFamilyMapToWALEdit(op.getFamilyMap(), walEdit);
       }
 
       // Append the edit to WAL
@@ -2063,12 +2075,13 @@ public class HRegion implements HeapSize
       // ------------------------------------
       // STEP 4. Write back to memstore
       // ----------------------------------
+
       long addedSize = 0;
       for (int i = firstIndex; i < lastIndexExclusive; i++) {
         if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
 
-        Put p = batchOp.operations[i].getFirst();
-        addedSize += applyFamilyMapToMemstore(p.getFamilyMap());
+        Mutation op = batchOp.operations[i].getFirst();
+        addedSize += applyFamilyMapToMemstore(op.getFamilyMap());
         batchOp.retCodes[i] = OperationStatusCode.SUCCESS;
       }
       success = true;
@@ -2086,7 +2099,7 @@ public class HRegion implements HeapSize
       if (null == signature) {
         signature = SchemaMetrics.CF_BAD_FAMILY_PREFIX;
       }
-      HRegion.incrTimeVaryingMetric(signature + "multiput_", after - now);
+      HRegion.incrTimeVaryingMetric(signature + methodNameForMetricsUpdate, after - now);
 
       if (!success) {
         for (int i = firstIndex; i < lastIndexExclusive; i++) {
@@ -2154,7 +2167,7 @@ public class HRegion implements HeapSize
             put(((Put)w).getFamilyMap(), writeToWAL);
           } else {
             Delete d = (Delete)w;
-            prepareDelete(d);
+            prepareDeleteFamilyMap(d);
             delete(d.getFamilyMap(), writeToWAL);
           }
           return true;
@@ -2372,8 +2385,8 @@ public class HRegion implements HeapSize
       checkFamily(family);
     }
   }
-  private void checkTimestamps(Put p, long now) throws DoNotRetryIOException {
-    checkTimestamps(p.getFamilyMap(), now);
+  private void checkTimestamps(Mutation op, long now) throws DoNotRetryIOException {
+    checkTimestamps(op.getFamilyMap(), now);
   }
 
   private void checkTimestamps(final Map<byte[], List<KeyValue>> familyMap,
@@ -3713,7 +3726,7 @@ public class HRegion implements HeapSize
             updateKVTimestamps(familyMap.values(), byteNow);
           } else if (m instanceof Delete) {
             Delete d = (Delete) m;
-            prepareDelete(d);
+            prepareDeleteFamilyMap(d);
             Map<byte[], List<KeyValue>> familyMap = m.getFamilyMap();
             prepareDeleteTimestamps(familyMap, byteNow);
           } else {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1338352&r1=1338351&r2=1338352&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Mon May 14 19:19:35 2012
@@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.YouAreDea
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HMsg.Type;
 import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -764,6 +765,7 @@ public class HRegionServer implements HR
    * Add to the passed <code>msgs</code> messages to pass to the master.
    * @param msgs Current outboundMsgs array; we'll add messages to this List.
    */
+  // Amit: Warning n^2 loop. Can bring it down to O(n) using a hash map.
   private void addOutboundMsgs(final List<HMsg> msgs) {
     if (msgs.isEmpty()) {
       this.outboundMsgs.drainTo(msgs);
@@ -2107,6 +2109,13 @@ public class HRegionServer implements HR
   @Override
   public int put(final byte[] regionName, final List<Put> puts)
   throws IOException {
+    return applyMutations(regionName, puts, "multiput_");
+  }
+  
+  private int applyMutations(final byte[] regionName, 
+      final List<? extends Mutation> mutations,
+      String methodName)
+  throws IOException {
     checkOpen();
     HRegion region = null;
     try {
@@ -2116,16 +2125,17 @@ public class HRegionServer implements HR
       }
 
       @SuppressWarnings("unchecked")
-      Pair<Put, Integer>[] putsWithLocks = new Pair[puts.size()];
+      Pair<Mutation, Integer>[] opWithLocks = new Pair[mutations.size()];
 
       int i = 0;
-      for (Put p : puts) {
+      for (Mutation p : mutations) {
         Integer lock = getLockFromId(p.getLockId());
-        putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
+        opWithLocks[i++] = new Pair<Mutation, Integer>(p, lock);
       }
 
-      this.requestCount.addAndGet(puts.size());
-      OperationStatusCode[] codes = region.put(putsWithLocks);
+      this.requestCount.addAndGet(mutations.size());
+      OperationStatusCode[] codes = region.batchMutateWithLocks(opWithLocks,
+          methodName);
       for (i = 0; i < codes.length; i++) {
         if (codes[i] != OperationStatusCode.SUCCESS)
           return i;
@@ -2383,33 +2393,7 @@ public class HRegionServer implements HR
   @Override
   public int delete(final byte[] regionName, final List<Delete> deletes)
   throws IOException {
-    // Count of Deletes processed.
-    int i = 0;
-    checkOpen();
-    HRegion region = null;
-    try {
-      boolean writeToWAL = true;
-      region = getRegion(regionName);
-      if (!region.getRegionInfo().isMetaTable()) {
-        this.cacheFlusher.reclaimMemStoreMemory();
-      }
-      int size = deletes.size();
-      Integer[] locks = new Integer[size];
-      for (Delete delete: deletes) {
-        this.requestCount.incrementAndGet();
-        locks[i] = getLockFromId(delete.getLockId());
-        region.delete(delete, locks[i], writeToWAL);
-        i++;
-      }
-    } catch (WrongRegionException ex) {
-      LOG.debug("Batch deletes: " + i, ex);
-      return i;
-    } catch (NotServingRegionException ex) {
-      return i;
-    } catch (Throwable t) {
-      throw convertThrowableToIOE(cleanup(t));
-    }
-    return -1;
+    return applyMutations(regionName, deletes, "multidelete_");
   }
 
   @Override

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1338352&r1=1338351&r2=1338352&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Mon May 14 19:19:35 2012
@@ -430,7 +430,7 @@ public class TestHRegion extends HBaseTe
       putsAndLocks.add(pair);
     }
 
-    codes = region.put(putsAndLocks.toArray(new Pair[0]));
+    codes = region.batchMutateWithLocks(putsAndLocks.toArray(new Pair[0]), "multiput_");
     LOG.info("...performed put");
     for (int i = 0; i < 10; i++) {
       assertEquals((i == 5) ? OperationStatusCode.SANITY_CHECK_FAILURE :



Mime
View raw message