hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From la...@apache.org
Subject svn commit: r1298680 - /hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Date Fri, 09 Mar 2012 00:53:55 GMT
Author: larsh
Date: Fri Mar  9 00:53:54 2012
New Revision: 1298680

URL: http://svn.apache.org/viewvc?rev=1298680&view=rev
Log:
HBASE-5541 Avoid holding the rowlock during HLog sync in HRegion.mutateRowWithLocks

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1298680&r1=1298679&r2=1298680&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri
Mar  9 00:53:54 2012
@@ -4216,6 +4216,10 @@ public class HRegion implements HeapSize
         }
       }
 
+      long txid = 0;
+      boolean walSyncSuccessful = false;
+      boolean locked = false;
+
       // 2. acquire the row lock(s)
       acquiredLocks = new ArrayList<Integer>(rowsToLock.size());
       for (byte[] row : rowsToLock) {
@@ -4230,6 +4234,7 @@ public class HRegion implements HeapSize
 
       // 3. acquire the region lock
       this.updatesLock.readLock().lock();
+      locked = true;
 
       // 4. Get a mvcc write number
       MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert();
@@ -4258,10 +4263,12 @@ public class HRegion implements HeapSize
           }
         }
 
-        // 6. append/sync all edits at once
-        // TODO: Do batching as in doMiniBatchPut
-        this.log.append(regionInfo, this.htableDescriptor.getName(), walEdit,
-            HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor);
+        // 6. append all edits at once (don't sync)
+        if (walEdit.size() > 0) {
+          txid = this.log.appendNoSync(regionInfo,
+              this.htableDescriptor.getName(), walEdit,
+              HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor);
+        }
 
         // 7. apply to memstore
         long addedSize = 0;
@@ -4269,32 +4276,79 @@ public class HRegion implements HeapSize
           addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w);
         }
         flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
-      } finally {
-        // 8. roll mvcc forward
-        mvcc.completeMemstoreInsert(w);
 
-        // 9. release region lock
+        // 8. release region and row lock(s)
         this.updatesLock.readLock().unlock();
-      }
-      // 10. run all coprocessor post hooks, after region lock is released
-      if (coprocessorHost != null) {
-        for (Mutation m : mutations) {
-          if (m instanceof Put) {
-            coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL());
-          } else if (m instanceof Delete) {
-            coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL());
+        locked = false;
+        if (acquiredLocks != null) {
+          for (Integer lid : acquiredLocks) {
+            releaseRowLock(lid);
           }
+          acquiredLocks = null;
         }
-      }
-    } finally {
-      if (acquiredLocks != null) {
-        // 11. release the row lock
-        for (Integer lid : acquiredLocks) {
-          releaseRowLock(lid);
+
+        // 9. sync WAL if required
+        if (walEdit.size() > 0 &&
+            (this.regionInfo.isMetaRegion() ||
+             !this.htableDescriptor.isDeferredLogFlush())) {
+          this.log.sync(txid);
+        }
+        walSyncSuccessful = true;
+
+        // 10. advance mvcc
+        mvcc.completeMemstoreInsert(w);
+        w = null;
+
+        // 11. run coprocessor post host hooks
+        // after the WAL is sync'ed and all locks are released
+        // (similar to doMiniBatchPut)
+        if (coprocessorHost != null) {
+          for (Mutation m : mutations) {
+            if (m instanceof Put) {
+              coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL());
+            } else if (m instanceof Delete) {
+              coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL());
+            }
+          }
+        }
+      } finally {
+        // 12. clean up if needed
+        if (!walSyncSuccessful) {
+          int kvsRolledback = 0;
+          for (Mutation m : mutations) {
+            for (Map.Entry<byte[], List<KeyValue>> e : m.getFamilyMap()
+                .entrySet()) {
+              List<KeyValue> kvs = e.getValue();
+              byte[] family = e.getKey();
+              Store store = getStore(family);
+              // roll back each kv
+              for (KeyValue kv : kvs) {
+                store.rollback(kv);
+                kvsRolledback++;
+              }
+            }
+          }
+          LOG.info("mutateRowWithLocks: rolled back " + kvsRolledback
+              + " KeyValues");
+        }
+
+        if (w != null) {
+          mvcc.completeMemstoreInsert(w);
+        }
+
+        if (locked) {
+          this.updatesLock.readLock().unlock();
+        }
+
+        if (acquiredLocks != null) {
+          for (Integer lid : acquiredLocks) {
+            releaseRowLock(lid);
+          }
         }
       }
+    } finally {
       if (flush) {
-        // 12. Flush cache if needed. Do it outside update lock.
+        // 13. Flush cache if needed. Do it outside update lock.
         requestFlush();
       }
       closeRegionOperation();



Mime
View raw message