hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From la...@apache.org
Subject svn commit: r1332810 - /hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Date Tue, 01 May 2012 20:47:16 GMT
Author: larsh
Date: Tue May  1 20:47:16 2012
New Revision: 1332810

URL: http://svn.apache.org/viewvc?rev=1332810&view=rev
Log:
HBASE-5897 prePut coprocessor hook causing substantial CPU usage

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1332810&r1=1332809&r2=1332810&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue
May  1 20:47:16 2012
@@ -1861,10 +1861,12 @@ public class HRegion implements HeapSize
     T[] operations;
     int nextIndexToProcess = 0;
     OperationStatus[] retCodeDetails;
+    WALEdit[] walEditsFromCoprocessors;
 
     public BatchOperationInProgress(T[] operations) {
       this.operations = operations;
       this.retCodeDetails = new OperationStatus[operations.length];
+      this.walEditsFromCoprocessors = new WALEdit[operations.length];
       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
     }
 
@@ -1901,14 +1903,21 @@ public class HRegion implements HeapSize
     BatchOperationInProgress<Pair<Put, Integer>> batchOp =
       new BatchOperationInProgress<Pair<Put,Integer>>(putsAndLocks);
 
+    boolean initialized = false;
+
     while (!batchOp.isDone()) {
       checkReadOnly();
       checkResources();
 
       long newSize;
       startRegionOperation();
-      this.writeRequestsCount.increment();
+
       try {
+        if (!initialized) {
+          this.writeRequestsCount.increment();
+          doPrePutHook(batchOp);
+          initialized = true;
+        }
         long addedSize = doMiniBatchPut(batchOp);
         newSize = this.addAndGetGlobalMemstoreSize(addedSize);
       } finally {
@@ -1921,30 +1930,38 @@ public class HRegion implements HeapSize
     return batchOp.retCodeDetails;
   }
 
-  @SuppressWarnings("unchecked")
-  private long doMiniBatchPut(
-      BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException
{
-    final String tableName = getTableDesc().getNameAsString();
-
-    // The set of columnFamilies first seen.
-    Set<byte[]> cfSet = null;
-    // variable to note if all Put items are for the same CF -- metrics related
-    boolean cfSetConsistent = true;
-    long startTimeMs = EnvironmentEdgeManager.currentTimeMillis();
-
-    WALEdit walEdit = new WALEdit();
+  private void doPrePutHook(BatchOperationInProgress<Pair<Put, Integer>> batchOp)
+      throws IOException {
     /* Run coprocessor pre hook outside of locks to avoid deadlock */
+    WALEdit walEdit = new WALEdit();
     if (coprocessorHost != null) {
       for (int i = 0; i < batchOp.operations.length; i++) {
         Pair<Put, Integer> nextPair = batchOp.operations[i];
         Put put = nextPair.getFirst();
         if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) {
           // pre hook says skip this Put
-          // mark as success and skip below
+          // mark as success and skip in doMiniBatchPut
           batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
         }
+        if (!walEdit.isEmpty()) {
+          batchOp.walEditsFromCoprocessors[i] = walEdit;
+          walEdit = new WALEdit();
+        }
       }
     }
+  }
+
+  @SuppressWarnings("unchecked")
+  private long doMiniBatchPut(
+      BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException
{
+
+    // The set of columnFamilies first seen.
+    Set<byte[]> cfSet = null;
+    // variable to note if all Put items are for the same CF -- metrics related
+    boolean cfSetConsistent = true;
+    long startTimeMs = EnvironmentEdgeManager.currentTimeMillis();
+
+    WALEdit walEdit = new WALEdit();
 
     MultiVersionConsistencyControl.WriteEntry w = null;
     long txid = 0;
@@ -2082,6 +2099,13 @@ public class HRegion implements HeapSize
 
         Put p = batchOp.operations[i].getFirst();
         if (!p.getWriteToWAL()) continue;
+        // Add WAL edits by CP
+        WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
+        if (fromCP != null) {
+          for (KeyValue kv : fromCP.getKeyValues()) {
+            walEdit.add(kv);
+          }
+        }
         addFamilyMapToWALEdit(familyMaps[i], walEdit);
       }
 



Mime
View raw message