hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1134478 - in /hbase/branches/0.90: ./ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/util/
Date Fri, 10 Jun 2011 23:29:40 GMT
Author: stack
Date: Fri Jun 10 23:29:40 2011
New Revision: 1134478

URL: http://svn.apache.org/viewvc?rev=1134478&view=rev
Log:
HBASE-3894 Thread contention over row locks set monitor

Added:
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
Modified:
    hbase/branches/0.90/CHANGES.txt
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java

Modified: hbase/branches/0.90/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/CHANGES.txt?rev=1134478&r1=1134477&r2=1134478&view=diff
==============================================================================
--- hbase/branches/0.90/CHANGES.txt (original)
+++ hbase/branches/0.90/CHANGES.txt Fri Jun 10 23:29:40 2011
@@ -28,6 +28,7 @@ Release 0.90.4 - Unreleased
                and some keyvalue is outdated (Zhou Shuaifeng)
    HBASE-3976  Disable block cache on compactions (Karthik Sankarachary)
    HBASE-3892  Table can't disable (Gao Jinchao)
+   HBASE-3894  Thread contention over row locks set monitor (Dave Latham)
 
   IMPROVEMENT
    HBASE-3882  hbase-config.sh needs to be updated so it can auto-detects the

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1134478&r1=1134477&r2=1134478&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri
Jun 10 23:29:40 2011
@@ -1,5 +1,5 @@
 /*
- * Copyright 2010 The Apache Software Foundation
+ * Copyright 2011 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -36,11 +36,12 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
 import java.util.Random;
-import java.util.Set;
 import java.util.TreeMap;
-import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -78,6 +79,7 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.HashedBytes;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -87,7 +89,6 @@ import org.apache.hadoop.hbase.util.FSUt
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.collect.Lists;
@@ -144,22 +145,16 @@ public class HRegion implements HeapSize
   // Members
   //////////////////////////////////////////////////////////////////////////////
 
-  private final Set<byte[]> lockedRows =
-    new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
-  private final Map<Integer, byte []> lockIds =
-    new HashMap<Integer, byte []>();
-  private int lockIdGenerator = 1;
+  private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows = 
+    new ConcurrentHashMap<HashedBytes, CountDownLatch>();
+  private final ConcurrentHashMap<Integer, HashedBytes> lockIds =
+    new ConcurrentHashMap<Integer, HashedBytes>();
+  private final AtomicInteger lockIdGenerator = new AtomicInteger(1);
   static private Random rand = new Random();
 
   protected final Map<byte [], Store> stores =
     new ConcurrentSkipListMap<byte [], Store>(Bytes.BYTES_RAWCOMPARATOR);
 
-  //These variable are just used for getting data out of the region, to test on
-  //client side
-  // private int numStores = 0;
-  // private int [] storeSize = null;
-  // private byte [] name = null;
-
   final AtomicLong memstoreSize = new AtomicLong(0);
 
   /**
@@ -218,6 +213,9 @@ public class HRegion implements HeapSize
     boolean isFlushRequested() {
       return this.flushRequested;
     }
+
+    static final long HEAP_SIZE = ClassSize.align(
+        ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
   }
 
   final WriteState writestate = new WriteState();
@@ -2088,41 +2086,41 @@ public class HRegion implements HeapSize
    *        null if unavailable.
    */
   private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
-  throws IOException {
+      throws IOException {
     checkRow(row);
     startRegionOperation();
     try {
-      synchronized (lockedRows) {
-        while (lockedRows.contains(row)) {
+      HashedBytes rowKey = new HashedBytes(row);
+      CountDownLatch rowLatch = new CountDownLatch(1);
+      
+      // loop until we acquire the row lock (unless !waitForLock)
+      while (true) {
+        CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch);
+        if (existingLatch == null) {
+          break;
+        } else {
+          // row already locked
           if (!waitForLock) {
             return null;
           }
           try {
-            lockedRows.wait();
+            existingLatch.await();
           } catch (InterruptedException ie) {
             // Empty
           }
         }
-        // generate a new lockid. Attempt to insert the new [lockid, row].
-        // if this lockid already exists in the map then revert and retry
-        // We could have first done a lockIds.get, and if it does not exist only
-        // then do a lockIds.put, but the hope is that the lockIds.put will
-        // mostly return null the first time itself because there won't be
-        // too many lockId collisions.
-        byte [] prev = null;
-        Integer lockId = null;
-        do {
-          lockId = new Integer(lockIdGenerator++);
-          prev = lockIds.put(lockId, row);
-          if (prev != null) {
-            lockIds.put(lockId, prev);    // revert old value
-            lockIdGenerator = rand.nextInt(); // generate new start point
-          }
-        } while (prev != null);
-
-        lockedRows.add(row);
-        lockedRows.notifyAll();
-        return lockId;
+      }
+       
+      // loop until we generate an unused lock id
+      while (true) {
+        Integer lockId = lockIdGenerator.incrementAndGet();
+        HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey);
+        if (existingRowKey == null) {
+          return lockId;
+        } else {
+          // lockId already in use, jump generator to a new spot
+          lockIdGenerator.set(rand.nextInt());
+        }
       }
     } finally {
       closeRegionOperation();
@@ -2134,36 +2132,37 @@ public class HRegion implements HeapSize
    * @param lockid
    * @return Row that goes with <code>lockid</code>
    */
-  byte [] getRowFromLock(final Integer lockid) {
-    synchronized (lockedRows) {
-      return lockIds.get(lockid);
-    }
+  byte[] getRowFromLock(final Integer lockid) {
+    HashedBytes rowKey = lockIds.get(lockid);
+    return rowKey == null ? null : rowKey.getBytes();
   }
 
   /**
    * Release the row lock!
-   * @param lockid  The lock ID to release.
+   * @param lockId  The lock ID to release.
    */
-  void releaseRowLock(final Integer lockid) {
-    synchronized (lockedRows) {
-      byte[] row = lockIds.remove(lockid);
-      lockedRows.remove(row);
-      lockedRows.notifyAll();
+  void releaseRowLock(final Integer lockId) {
+    HashedBytes rowKey = lockIds.remove(lockId);
+    if (rowKey == null) {
+      LOG.warn("Release unknown lockId: " + lockId);
+      return;
     }
+    CountDownLatch rowLatch = lockedRows.remove(rowKey);
+    if (rowLatch == null) {
+      LOG.error("Releases row not locked, lockId: " + lockId + " row: "
+          + rowKey);
+      return;
+    }
+    rowLatch.countDown();
   }
 
   /**
    * See if row is currently locked.
-   * @param lockid
+   * @param lockId
    * @return boolean
    */
-  boolean isRowLocked(final Integer lockid) {
-    synchronized (lockedRows) {
-      if (lockIds.get(lockid) != null) {
-        return true;
-      }
-      return false;
-    }
+  boolean isRowLocked(final Integer lockId) {
+    return lockIds.containsKey(lockId);
   }
 
   /**
@@ -2289,6 +2288,7 @@ public class HRegion implements HeapSize
       }
     }
 
+    @Override
     public synchronized boolean next(List<KeyValue> outResults, int limit)
         throws IOException {
       if (this.filterClosed) {
@@ -2316,6 +2316,7 @@ public class HRegion implements HeapSize
       }
     }
 
+    @Override
     public synchronized boolean next(List<KeyValue> outResults)
         throws IOException {
       // apply the batching limit by default
@@ -2410,6 +2411,7 @@ public class HRegion implements HeapSize
               currentRow, 0, currentRow.length) <= isScan);
     }
 
+    @Override
     public synchronized void close() {
       if (storeHeap != null) {
         storeHeap.close();
@@ -3191,29 +3193,32 @@ public class HRegion implements HeapSize
   }
 
   public static final long FIXED_OVERHEAD = ClassSize.align(
-      (4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN + ClassSize.ARRAY +
-      (22 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
-
-  public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
-      (ClassSize.OBJECT * 2) + (2 * ClassSize.ATOMIC_BOOLEAN) +
-      ClassSize.ATOMIC_LONG + ClassSize.ATOMIC_INTEGER +
-
-      // Using TreeMap for TreeSet
-      ClassSize.TREEMAP +
-
-      // Using TreeMap for HashMap
-      ClassSize.TREEMAP +
-
-      ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY +
-      ClassSize.align(ClassSize.OBJECT +
-        (5 * Bytes.SIZEOF_BOOLEAN)) +
-        (3 * ClassSize.REENTRANT_LOCK));
+      ClassSize.OBJECT + // this
+      (4 * Bytes.SIZEOF_LONG) + // memstoreFlushSize, lastFlushTime, blockingMemStoreSize,
threadWakeFrequency 
+      (2 * Bytes.SIZEOF_BOOLEAN) + // forceMajorCompaction, splitRequest 
+      ClassSize.ARRAY + // splitPoint
+      (23 * ClassSize.REFERENCE));
+
+  public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
+      ClassSize.OBJECT + // closeLock
+      (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
+      ClassSize.ATOMIC_LONG + // memStoreSize 
+      ClassSize.ATOMIC_INTEGER + // lockIdGenerator
+      (2 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, lockIds
+      WriteState.HEAP_SIZE + // writestate
+      ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
+      (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
+      ClassSize.ARRAYLIST + // recentFlushes
+      ReadWriteConsistencyControl.FIXED_SIZE // rwcc
+      ;
 
+  @Override
   public long heapSize() {
     long heapSize = DEEP_OVERHEAD;
     for(Store store : this.stores.values()) {
       heapSize += store.heapSize();
     }
+    // this does not take into account row locks, recent flushes, rwcc entries
     return heapSize;
   }
 

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java?rev=1134478&r1=1134477&r2=1134478&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
(original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
Fri Jun 10 23:29:40 2011
@@ -20,7 +20,9 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.util.LinkedList;
-import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
 
 /**
  * Manages the read/write consistency within memstore. This provides
@@ -158,4 +160,10 @@ public class ReadWriteConsistencyControl
       return this.writeNumber;
     }
   }
+  
+  public static final long FIXED_SIZE = ClassSize.align(
+      ClassSize.OBJECT + 
+      2 * Bytes.SIZEOF_LONG + 
+      2 * ClassSize.REFERENCE);
+  
 }

Added: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java?rev=1134478&view=auto
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java (added)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java Fri Jun
10 23:29:40 2011
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.Arrays;
+
+/**
+ * This class encapsulates a byte array and overrides hashCode and equals so
+ * that it's identity is based on the data rather than the array instance.
+ */
+public class HashedBytes {
+
+  private final byte[] bytes;
+  private final int hashCode;
+
+  public HashedBytes(byte[] bytes) {
+    this.bytes = bytes;
+    hashCode = Bytes.hashCode(bytes);
+  }
+
+  public byte[] getBytes() {
+    return bytes;
+  }
+
+  @Override
+  public int hashCode() {
+    return hashCode;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null || getClass() != obj.getClass())
+      return false;
+    HashedBytes other = (HashedBytes) obj;
+    return Arrays.equals(bytes, other.bytes);
+  }
+
+  @Override
+  public String toString() {
+    return Bytes.toStringBinary(bytes);
+  }
+}



Mime
View raw message