hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bryanduxb...@apache.org
Subject svn commit: r645740 - in /hadoop/hbase/trunk: ./ conf/ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/regionserver/ src/test/org/apache/hadoop/hbase/
Date Tue, 08 Apr 2008 00:09:21 GMT
Author: bryanduxbury
Date: Mon Apr  7 17:09:20 2008
New Revision: 645740

URL: http://svn.apache.org/viewvc?rev=645740&view=rev
Log:
HBASE-512 Add configuration for global aggregate memcache size

-Refactored Flusher slightly, added reclaimMemcacheMemory method
-HRegionServer calls reclaimMemcacheMemory during batchUpdates
-Added TestGlobalMemcacheLimit to verify new functionality 
-Added new config parameter defaults to hbase-default.xml

Added:
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestGlobalMemcacheLimit.java
Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/conf/hbase-default.xml
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=645740&r1=645739&r2=645740&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Mon Apr  7 17:09:20 2008
@@ -18,6 +18,7 @@
   NEW FEATURES
    HBASE-548   Tool to online single region
    HBASE-71    Master should rebalance region assignments periodically
+   HBASE-512   Add configuration for global aggregate memcache size
    
   IMPROVEMENTS
    HBASE-469   Streamline HStore startup and compactions

Modified: hadoop/hbase/trunk/conf/hbase-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/conf/hbase-default.xml?rev=645740&r1=645739&r2=645740&view=diff
==============================================================================
--- hadoop/hbase/trunk/conf/hbase-default.xml (original)
+++ hadoop/hbase/trunk/conf/hbase-default.xml Mon Apr  7 17:09:20 2008
@@ -261,4 +261,21 @@
   	<description>TableFormatter to use outputting HQL result sets.
   	</description>
   </property>  
+  <property>
+    <name>hbase.regionserver.globalMemcacheLimit</name>
+    <value>536870912</value>
+    <description>Maximum size of all memcaches in a region server before new 
+      updates are blocked and flushes are forced. Defaults to 512MB.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.globalMemcacheLimitlowMark</name>
+    <value>256435456</value>
+    <description>When memcaches are being forced to flush to make room in
+      memory, keep flushing until we hit this mark. Defaults to 256MB. Setting
+      this value equal to hbase.regionserver.globalmemcachelimit causes the 
+      minimum possible flushing to occur when updates are blocked due to 
+      memcache limiting.
+    </description>
+  </property>  
 </configuration>

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java?rev=645740&r1=645739&r2=645740&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java Mon Apr  7
17:09:20 2008
@@ -118,6 +118,16 @@
     }
   }
 
+  /**
+   * @param serverNumber
+   * @return region server
+   */
+  public HRegionServer getRegionServer(int serverNumber) {
+    synchronized (regionThreads) {
+      return regionThreads.get(serverNumber).getRegionServer();
+    }
+  }
+
   /** runs region servers */
   public static class RegionServerThread extends Thread {
     private final HRegionServer regionServer;

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java?rev=645740&r1=645739&r2=645740&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java Mon Apr
 7 17:09:20 2008
@@ -25,6 +25,9 @@
 import java.util.concurrent.TimeUnit;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Comparator;
 import java.util.ConcurrentModificationException;
 
 import org.apache.commons.logging.Log;
@@ -46,6 +49,11 @@
   private final long optionalFlushPeriod;
   private final HRegionServer server;
   private final Integer lock = new Integer(0);
+  private final Integer memcacheSizeLock = new Integer(0);  
+  private long lastOptionalCheck = System.currentTimeMillis();
+
+  protected final long globalMemcacheLimit;
+  protected final long globalMemcacheLimitLowMark;
   
   /**
    * @param conf
@@ -54,67 +62,35 @@
   public Flusher(final HBaseConfiguration conf, final HRegionServer server) {
     super();
     this.server = server;
-    this.optionalFlushPeriod = conf.getLong(
+    optionalFlushPeriod = conf.getLong(
         "hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L);
-    this.threadWakeFrequency = conf.getLong(
+    threadWakeFrequency = conf.getLong(
         HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
+        
+    // default memcache limit of 512MB
+    globalMemcacheLimit = 
+      conf.getLong("hbase.regionserver.globalMemcacheLimit", 512 * 1024 * 1024);
+    // default memcache low mark limit of 256MB, which is half the upper limit
+    globalMemcacheLimitLowMark = 
+      conf.getLong("hbase.regionserver.globalMemcacheLimitLowMark", 
+        globalMemcacheLimit / 2);        
   }
   
   /** {@inheritDoc} */
   @Override
   public void run() {
-    long lastOptionalCheck = System.currentTimeMillis(); 
     while (!server.isStopRequested()) {
       HRegion r = null;
       try {
-        long now = System.currentTimeMillis();
-        if (now - threadWakeFrequency > lastOptionalCheck) {
-          lastOptionalCheck = now;
-          // Queue up regions for optional flush if they need it
-          Set<HRegion> regions = server.getRegionsToCheck();
-          for (HRegion region: regions) {
-            synchronized (regionsInQueue) {
-              if (!regionsInQueue.contains(region) &&
-                  (now - optionalFlushPeriod) > region.getLastFlushTime()) {
-                regionsInQueue.add(region);
-                flushQueue.add(region);
-                region.setLastFlushTime(now);
-              }
-            }
-          }
-        }
+        enqueueOptionalFlushRegions();
         r = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
-        if (r != null) {
-          synchronized (regionsInQueue) {
-            regionsInQueue.remove(r);
-          }
-          synchronized (lock) { // Don't interrupt while we're working
-            if (r.flushcache()) {
-              server.compactSplitThread.compactionRequested(r);
-            }
-          }
+        if (!flushImmediately(r)) {
+          break;
         }
       } catch (InterruptedException ex) {
         continue;
       } catch (ConcurrentModificationException ex) {
         continue;
-      } catch (DroppedSnapshotException ex) {
-        // Cache flush can fail in a few places.  If it fails in a critical
-        // section, we get a DroppedSnapshotException and a replay of hlog
-        // is required. Currently the only way to do this is a restart of
-        // the server.
-        LOG.fatal("Replay of hlog required. Forcing server restart", ex);
-        if (!server.checkFileSystem()) {
-          break;
-        }
-        server.stop();
-      } catch (IOException ex) {
-        LOG.error("Cache flush failed" +
-          (r != null ? (" for region " + r.getRegionName()) : ""),
-          RemoteExceptionHandler.checkIOException(ex));
-        if (!server.checkFileSystem()) {
-          break;
-        }
       } catch (Exception ex) {
         LOG.error("Cache flush failed" +
           (r != null ? (" for region " + r.getRegionName()) : ""),
@@ -147,4 +123,108 @@
       interrupt();
     }
   }
+  
+  /**
+   * Flush a region right away, while respecting concurrency with the async
+   * flushing that is always going on.
+   */
+  private boolean flushImmediately(HRegion region) {
+    try {
+      if (region != null) {
+        synchronized (regionsInQueue) {
+          // take the region out of the set and the queue, if it happens to be 
+          // in the queue. this didn't used to be a constraint, but now that
+          // HBASE-512 is in play, we need to try and limit double-flushing
+          // regions.
+          regionsInQueue.remove(region);
+          flushQueue.remove(region);
+        }
+        synchronized (lock) { // Don't interrupt while we're working
+          if (region.flushcache()) {
+            server.compactSplitThread.compactionRequested(region);
+          }
+        }
+      }      
+    } catch (DroppedSnapshotException ex) {
+      // Cache flush can fail in a few places.  If it fails in a critical
+      // section, we get a DroppedSnapshotException and a replay of hlog
+      // is required. Currently the only way to do this is a restart of
+      // the server.
+      LOG.fatal("Replay of hlog required. Forcing server restart", ex);
+      if (!server.checkFileSystem()) {
+        return false;
+      }
+      server.stop();
+    } catch (IOException ex) {
+      LOG.error("Cache flush failed" +
+        (region != null ? (" for region " + region.getRegionName()) : ""),
+        RemoteExceptionHandler.checkIOException(ex));
+      if (!server.checkFileSystem()) {
+        return false;
+      }
+    }
+    return true;
+  }
+  
+  /**
+   * Find the regions that should be optionally flushed and put them on the
+   * flush queue.
+   */
+  private void enqueueOptionalFlushRegions() {
+    long now = System.currentTimeMillis();
+    if (now - threadWakeFrequency > lastOptionalCheck) {
+      lastOptionalCheck = now;
+      // Queue up regions for optional flush if they need it
+      Set<HRegion> regions = server.getRegionsToCheck();
+      for (HRegion region: regions) {
+        synchronized (regionsInQueue) {
+          if (!regionsInQueue.contains(region) &&
+              (now - optionalFlushPeriod) > region.getLastFlushTime()) {
+            regionsInQueue.add(region);
+            flushQueue.add(region);
+            region.setLastFlushTime(now);
+          }
+        }
+      }
+    }
+  }
+  
+  /**
+   * Check if the regionserver's memcache memory usage is greater than the 
+   * limit. If so, flush regions with the biggest memcaches until we're down
+   * to the lower limit. This method blocks callers until we're down to a safe
+   * amount of memcache consumption.
+   */
+  public void reclaimMemcacheMemory() {
+    synchronized (memcacheSizeLock) {
+      if (server.getGlobalMemcacheSize() >= globalMemcacheLimit) {
+        flushSomeRegions();
+      }
+    }
+  }
+  
+  private void flushSomeRegions() {
+    // we'll sort the regions in reverse
+    SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
+      new Comparator<Long>() {
+        public int compare(Long a, Long b) {
+          return -1 * a.compareTo(b);
+        }
+      }
+    );
+    
+    // copy over all the regions
+    for (HRegion region : server.onlineRegions.values()) {
+      sortedRegions.put(region.memcacheSize.get(), region);
+    }
+    
+    // keep flushing until we hit the low water mark
+    while (server.getGlobalMemcacheSize() >= globalMemcacheLimitLowMark) {
+      // flush the region with the biggest memcache
+      HRegion biggestMemcacheRegion = 
+        sortedRegions.remove(sortedRegions.firstKey());
+      flushImmediately(biggestMemcacheRegion);
+    }
+  }
+  
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=645740&r1=645739&r2=645740&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Mon
Apr  7 17:09:20 2008
@@ -1098,6 +1098,7 @@
     this.requestCount.incrementAndGet();
     HRegion region = getRegion(regionName);
     try {
+      cacheFlusher.reclaimMemcacheMemory();
       region.batchUpdate(b);
     } catch (IOException e) {
       checkFileSystem();
@@ -1403,6 +1404,20 @@
    */
   protected List<HMsg> getOutboundMsgs() {
     return this.outboundMsgs;
+  }
+
+  /**
+   * Return the total size of all memcaches in every region.
+   * @return memcache size in bytes
+   */
+  public long getGlobalMemcacheSize() {
+    long total = 0;
+    synchronized (onlineRegions) {
+      for (HRegion region : onlineRegions.values()) {
+        total += region.memcacheSize.get();
+      }
+    }
+    return total;
   }
   
   //

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=645740&r1=645739&r2=645740&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java Mon Apr  7 17:09:20
2008
@@ -161,4 +161,13 @@
   public List<LocalHBaseCluster.RegionServerThread> getRegionThreads() {
     return this.hbaseCluster.getRegionServers();
   }
+  
+  /**
+   * Grab a numbered region server of your choice.
+   * @param serverNumber
+   * @return region server
+   */
+  public HRegionServer getRegionServer(int serverNumber) {
+    return hbaseCluster.getRegionServer(serverNumber);
+  }
 }

Added: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestGlobalMemcacheLimit.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestGlobalMemcacheLimit.java?rev=645740&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestGlobalMemcacheLimit.java (added)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestGlobalMemcacheLimit.java Mon Apr
 7 17:09:20 2008
@@ -0,0 +1,141 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+
+/**
+ * Test setting the global memcache size for a region server. When it reaches 
+ * this size, any puts should be blocked while one or more forced flushes occurs
+ * to bring the memcache size back down. 
+ */
+public class TestGlobalMemcacheLimit extends HBaseClusterTestCase {
+  final byte[] ONE_KB = new byte[1024];
+
+  HTable table1;
+  HTable table2;
+  HRegionServer server;
+  
+  long keySize = (new Text(COLFAMILY_NAME1)).getLength() + 9 + 8;
+  long rowSize = keySize + ONE_KB.length;
+  
+  /**
+   * Get our hands into the cluster configuration before the hbase cluster 
+   * starts up.
+   */
+  @Override
+  public void preHBaseClusterSetup() {
+    // we'll use a 2MB global memcache for testing's sake.
+    conf.setInt("hbase.regionserver.globalMemcacheLimit", 2 * 1024 * 1024);
+    // low memcache mark will be 1MB
+    conf.setInt("hbase.regionserver.globalMemcacheLimitLowMark", 
+      1 * 1024 * 1024);
+    // make sure we don't do any optional flushes and confuse my tests.
+    conf.setInt("hbase.regionserver.optionalcacheflushinterval", 120000);
+  }
+  
+  /**
+   * Create a table that we'll use to test.
+   */
+  @Override
+  public void postHBaseClusterSetup() throws IOException {
+    HTableDescriptor desc1 = createTableDescriptor("testTable1");
+    HTableDescriptor desc2 = createTableDescriptor("testTable2");
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    admin.createTable(desc1);
+    admin.createTable(desc2);
+    table1 = new HTable(conf, new Text("testTable1"));
+    table2 = new HTable(conf, new Text("testTable2"));    
+    server = cluster.getRegionServer(0);    
+    
+    // there is a META region in play, and those are probably still in
+    // the memcache for ROOT. flush it out.
+    for (HRegion region : server.getOnlineRegions().values()) {
+      region.flushcache();
+    }
+    // make sure we're starting at 0 so that it's easy to predict what the 
+    // results of our tests should be.
+    assertEquals("Starting memcache size", 0, server.getGlobalMemcacheSize());
+  }
+  
+  /**
+   * Make sure that region server thinks all the memcaches are as big as we were
+   * hoping they would be.
+   */
+  public void testMemcacheSizeAccounting() throws IOException {
+    // put some data in each of the two tables
+    long dataSize = populate(table1, 500, 0) + populate(table2, 500, 0);
+    
+    // make sure the region server says it is using as much memory as we think
+    // it is.
+    assertEquals("Global memcache size", dataSize, 
+      server.getGlobalMemcacheSize());
+  }
+  
+  /**
+   * Test that a put gets blocked and a flush is forced as expected when we 
+   * reach the memcache size limit.
+   */
+  public void testBlocksAndForcesFlush() throws IOException {
+    // put some data in each of the two tables
+    long startingDataSize = populate(table1, 500, 0) + populate(table2, 500, 0);
+    
+    // at this point we have 1052000 bytes in memcache. now, we'll keep adding 
+    // data to one of the tables until just before the global memcache limit,
+    // noting that the globalMemcacheSize keeps growing as expected. then, we'll
+    // do another put, causing it to go over the limit. when we look at the
+    // globablMemcacheSize now, it should be <= the low limit. 
+    long dataNeeded = (2 * 1024 * 1024) - startingDataSize;
+    double numRows = (double)dataNeeded / (double)rowSize;
+    int preFlushRows = (int)Math.floor(numRows);
+  
+    long dataAdded = populate(table1, preFlushRows, 500);
+    assertEquals("Expected memcache size", dataAdded + startingDataSize, 
+      server.getGlobalMemcacheSize());
+        
+    populate(table1, 2, preFlushRows + 500);
+    assertTrue("Post-flush memcache size", server.getGlobalMemcacheSize() <= 1024 * 1024);
+  }
+  
+  private long populate(HTable table, int numRows, int startKey) throws IOException {
+    long total = 0;
+    BatchUpdate batchUpdate = null;
+    Text column = new Text(COLFAMILY_NAME1);
+    for (int i = startKey; i < startKey + numRows; i++) {
+      Text key = new Text("row_" + String.format("%1$5d", i));
+      total += key.getLength();
+      total += column.getLength();
+      total += 8;
+      total += ONE_KB.length;
+      batchUpdate = new BatchUpdate(key);
+      batchUpdate.put(column, ONE_KB);
+      table.commit(batchUpdate);
+    }
+    return total;
+  }
+}



Mime
View raw message