hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r651067 - in /hadoop/hbase/branches/0.1: ./ src/java/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/
Date Wed, 23 Apr 2008 21:20:44 GMT
Author: jimk
Date: Wed Apr 23 14:20:42 2008
New Revision: 651067

URL: http://svn.apache.org/viewvc?rev=651067&view=rev
Log:
HBASE-572   Backport HBASE-512 to 0.1 branch

Added:
    hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/FlushRequester.java
    hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/TestGlobalMemcacheLimit.java
Removed:
    hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/CacheFlushListener.java
Modified:
    hadoop/hbase/branches/0.1/CHANGES.txt
    hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HRegion.java
    hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HRegionServer.java
    hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
    hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
    hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/MultiRegionTable.java

Modified: hadoop/hbase/branches/0.1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.1/CHANGES.txt?rev=651067&r1=651066&r2=651067&view=diff
==============================================================================
--- hadoop/hbase/branches/0.1/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.1/CHANGES.txt Wed Apr 23 14:20:42 2008
@@ -21,6 +21,7 @@
                filtering decision is made (Clint Morgan via Stack)
    HBASE-586   HRegion runs HStore memcache snapshotting -- fix it so only HStore
                knows about workings of memcache
+   HBASE-572   Backport HBASE-512 to 0.1 branch
 
   IMPROVEMENTS
    HBASE-559   MR example job to count table rows

Added: hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/FlushRequester.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/FlushRequester.java?rev=651067&view=auto
==============================================================================
--- hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/FlushRequester.java (added)
+++ hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/FlushRequester.java Wed Apr
23 14:20:42 2008
@@ -0,0 +1,36 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+/**
+ * Implementors of this interface want to be notified when an HRegion
+ * determines that a cache flush is needed. A CacheFlushListener (or null)
+ * must be passed to the HRegion constructor.
+ */
+public interface FlushRequester {
+
+  /**
+   * Tell the listener the cache needs to be flushed.
+   * 
+   * @param region the HRegion requesting the cache flush
+   */
+  void request(HRegion region);
+}

Modified: hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HRegion.java?rev=651067&r1=651066&r2=651067&view=diff
==============================================================================
--- hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HRegion.java Wed Apr 23 14:20:42
2008
@@ -321,15 +321,19 @@
     volatile boolean writesEnabled = true;
   }
 
-  volatile WriteState writestate = new WriteState();
+  private volatile WriteState writestate = new WriteState();
 
   final int memcacheFlushSize;
   private volatile long lastFlushTime;
-  final CacheFlushListener flushListener;
-  final int blockingMemcacheSize;
-  protected final long threadWakeFrequency;
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  private final Integer updateLock = new Integer(0);
+  final FlushRequester flushListener;
+  private final int blockingMemcacheSize;
+  final long threadWakeFrequency;
+  // Used to guard splits and closes
+  private final ReentrantReadWriteLock splitsAndClosesLock =
+    new ReentrantReadWriteLock();
+  // Stop updates lock
+  private final ReentrantReadWriteLock updateLock =
+    new ReentrantReadWriteLock();
   private final Integer splitLock = new Integer(0);
   private final long desiredMaxFileSize;
   private final long minSequenceId;
@@ -360,7 +364,7 @@
    * @throws IOException
    */
   public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, 
-      HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener)
+      HRegionInfo regionInfo, Path initialFiles, FlushRequester listener)
     throws IOException {
     this(basedir, log, fs, conf, regionInfo, initialFiles, listener, null);
   }
@@ -389,7 +393,7 @@
    * @throws IOException
    */
   public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, 
-      HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener,
+      HRegionInfo regionInfo, Path initialFiles, FlushRequester listener,
       final Progressable reporter)
     throws IOException {
     
@@ -548,13 +552,12 @@
           }
         }
       }
-      lock.writeLock().lock();
-      LOG.debug("new updates and scanners for region " + regionName +
-          " disabled");
+      splitsAndClosesLock.writeLock().lock();
+      LOG.debug("new updates and scanners for region " + regionName + " disabled");
       
       try {
-        // Wait for active scanners to finish. The write lock we hold will prevent
-        // new scanners from being created.
+        // Wait for active scanners to finish. The write lock we hold will
+        // prevent new scanners from being created.
         synchronized (activeScannerCount) {
           while (activeScannerCount.get() != 0) {
             LOG.debug("waiting for " + activeScannerCount.get() +
@@ -602,7 +605,7 @@
         LOG.info("closed " + this.regionInfo.getRegionName());
         return result;
       } finally {
-        lock.writeLock().unlock();
+        splitsAndClosesLock.writeLock().unlock();
       }
     }
   }
@@ -661,6 +664,11 @@
     return this.lastFlushTime;
   }
   
+  /** @param t the lastFlushTime */
+  void setLastFlushTime(long t) {
+    this.lastFlushTime = t;
+  }
+  
   //////////////////////////////////////////////////////////////////////////////
   // HRegion maintenance.  
   //
@@ -920,8 +928,7 @@
         }
       }
       long startTime = System.currentTimeMillis();
-      LOG.info("starting compaction on region " +
-        this.regionInfo.getRegionName().toString());
+      LOG.info("starting compaction on region " + getRegionName());
       boolean status = true;
       doRegionCompactionPrep();
       for (HStore store : stores.values()) {
@@ -930,8 +937,7 @@
         }
       }
       doRegionCompactionCleanup();
-      LOG.info("compaction completed on region " +
-        this.regionInfo.getRegionName().toString() + ". Took " +
+      LOG.info("compaction completed on region " + getRegionName() + ". " +
         StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
       return status;
       
@@ -981,11 +987,12 @@
       }
     }
     try {
-      lock.readLock().lock();                      // Prevent splits and closes
+      // Prevent splits and closes
+      splitsAndClosesLock.readLock().lock();
       try {
         return internalFlushcache();
       } finally {
-        lock.readLock().unlock();
+        splitsAndClosesLock.readLock().unlock();
       }
     } finally {
       synchronized (writestate) {
@@ -1043,10 +1050,13 @@
     // to do this for a moment.  Its quick.  The subsequent sequence id that
     // goes into the HLog after we've flushed all these snapshots also goes
     // into the info file that sits beside the flushed files.
-    synchronized (updateLock) {
+    updateLock.writeLock().lock();
+    try {
       for (HStore s: stores.values()) {
         s.memcache.snapshot();
       }
+    } finally {
+      updateLock.writeLock().unlock();
     }
     long sequenceId = log.startCacheFlush();
 
@@ -1223,7 +1233,7 @@
     
     HStoreKey key = null;
     checkRow(row);
-    lock.readLock().lock();
+    splitsAndClosesLock.readLock().lock();
     try {
       // examine each column family for the preceeding or matching key
       for(Text colFamily : stores.keySet()){
@@ -1258,7 +1268,7 @@
       
       return result;
     } finally {
-      lock.readLock().unlock();
+      splitsAndClosesLock.readLock().unlock();
     }
   }
 
@@ -1304,7 +1314,7 @@
    */
   public HScannerInterface getScanner(Text[] cols, Text firstRow,
       long timestamp, RowFilterInterface filter) throws IOException {
-    lock.readLock().lock();
+    splitsAndClosesLock.readLock().lock();
     try {
       if (this.closed.get()) {
         throw new IOException("Region " + this.getRegionName().toString() +
@@ -1326,7 +1336,7 @@
       return new HScanner(cols, firstRow, timestamp,
         storelist.toArray(new HStore [storelist.size()]), filter);
     } finally {
-      lock.readLock().unlock();
+      splitsAndClosesLock.readLock().unlock();
     }
   }
 
@@ -1581,7 +1591,9 @@
     if (updatesByColumn == null || updatesByColumn.size() <= 0) {
       return;
     }
-    synchronized (updateLock) {                         // prevent a cache flush
+    boolean flush = false;
+    updateLock.readLock().lock();                      // prevent a cache flush
+    try {
       this.log.append(regionInfo.getRegionName(),
           regionInfo.getTableDesc().getName(), updatesByColumn);
 
@@ -1592,10 +1604,13 @@
         size = this.memcacheSize.addAndGet(getEntrySize(key, val));
         stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val);
       }
-      if (this.flushListener != null && size > this.memcacheFlushSize) {
-        // Request a cache flush
-        this.flushListener.flushRequested(this);
-      }
+      flush = this.flushListener != null && size > this.memcacheFlushSize;
+    } finally {
+      updateLock.readLock().unlock();
+    }
+    if (flush) {
+      // Request a cache flush
+      this.flushListener.request(this);
     }
   }
   
@@ -1665,10 +1680,10 @@
    */
   long obtainRowLock(Text row) throws IOException {
     checkRow(row);
-    lock.readLock().lock();
+    splitsAndClosesLock.readLock().lock();
     try {
       if (this.closed.get()) {
-        throw new IOException("Region " + this.getRegionName().toString() +
+        throw new NotServingRegionException("Region " + getRegionName() +
           " closed");
       }
       synchronized (rowsToLocks) {
@@ -1686,7 +1701,7 @@
         return lid.longValue();
       }
     } finally {
-      lock.readLock().unlock();
+      splitsAndClosesLock.readLock().unlock();
     }
   }
   
@@ -1725,6 +1740,18 @@
   
   /** {@inheritDoc} */
   @Override
+  public boolean equals(Object o) {
+    return this.hashCode() == ((HRegion)o).hashCode();
+  }
+  
+  /** {@inheritDoc} */
+  @Override
+  public int hashCode() {
+    return this.regionInfo.getRegionName().hashCode();
+  }
+  
+  /** {@inheritDoc} */
+  @Override
   public String toString() {
     return regionInfo.getRegionName().toString();
   }
@@ -1922,6 +1949,7 @@
       }
     }
 
+    /** {@inheritDoc} */
     public Iterator<Entry<HStoreKey, SortedMap<Text, byte[]>>> iterator()
{
       throw new UnsupportedOperationException("Unimplemented serverside. " +
         "next(HStoreKey, StortedMap(...) is more efficient");

Modified: hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=651067&r1=651066&r2=651067&view=diff
==============================================================================
--- hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HRegionServer.java Wed Apr
23 14:20:42 2008
@@ -26,6 +26,7 @@
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -38,12 +39,11 @@
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.DelayQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
@@ -161,78 +161,22 @@
     
   }
 
-  /** Queue entry passed to flusher, compactor and splitter threads */
-  class QueueEntry implements Delayed {
-    private final HRegion region;
-    private long expirationTime;
-
-    QueueEntry(HRegion region, long expirationTime) {
-      this.region = region;
-      this.expirationTime = expirationTime;
-    }
-    
-    /** {@inheritDoc} */
-    @Override
-    public boolean equals(Object o) {
-      QueueEntry other = (QueueEntry) o;
-      return this.hashCode() == other.hashCode();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public int hashCode() {
-      return this.region.getRegionInfo().hashCode();
-    }
-
-    /** {@inheritDoc} */
-    public long getDelay(TimeUnit unit) {
-      return unit.convert(this.expirationTime - System.currentTimeMillis(),
-          TimeUnit.MILLISECONDS);
-    }
-
-    /** {@inheritDoc} */
-    public int compareTo(Delayed o) {
-      long delta = this.getDelay(TimeUnit.MILLISECONDS) -
-        o.getDelay(TimeUnit.MILLISECONDS);
-
-      int value = 0;
-      if (delta > 0) {
-        value = 1;
-        
-      } else if (delta < 0) {
-        value = -1;
-      }
-      return value;
-    }
-
-    /** @return the region */
-    public HRegion getRegion() {
-      return region;
-    }
-
-    /** @param expirationTime the expirationTime to set */
-    public void setExpirationTime(long expirationTime) {
-      this.expirationTime = expirationTime;
-    }
-  }
-
   // Compactions
   final CompactSplitThread compactSplitThread;
-  // Needed during shutdown so we send an interrupt after completion of a
-  // compaction, not in the midst.
-  final Integer compactSplitLock = new Integer(0);
 
-  /** Compact region on request and then run split if appropriate
-   */
+  /** Compact region on request and then run split if appropriate */
   private class CompactSplitThread extends Thread
   implements RegionUnavailableListener {
     private HTable root = null;
     private HTable meta = null;
     private long startTime;
     private final long frequency;
+    private final ReentrantLock workingLock = new ReentrantLock();
     
-    private final BlockingQueue<QueueEntry> compactionQueue =
-      new LinkedBlockingQueue<QueueEntry>();
+    private final BlockingQueue<HRegion> compactionQueue =
+      new LinkedBlockingQueue<HRegion>();
+
+    private final HashSet<HRegion> regionsInQueue = new HashSet<HRegion>();
 
     /** constructor */
     public CompactSplitThread() {
@@ -246,21 +190,28 @@
     @Override
     public void run() {
       while (!stopRequested.get()) {
-        QueueEntry e = null;
+        HRegion r = null;
         try {
-          e = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
-          if (e == null) {
-            continue;
-          }
-          synchronized (compactSplitLock) { // Don't interrupt us while working
-            e.getRegion().compactIfNeeded();
-            split(e.getRegion());
+          r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
+          if (r != null) {
+            synchronized (regionsInQueue) {
+              regionsInQueue.remove(r);
+            }
+            workingLock.lock();
+            try {
+              // Don't interrupt us while we are working
+              if (r.compactStores()) {
+                split(r);
+              }
+            } finally {
+              workingLock.unlock();
+            }
           }
         } catch (InterruptedException ex) {
           continue;
         } catch (IOException ex) {
           LOG.error("Compaction failed" +
-              (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
+              (r != null ? (" for region " + r.getRegionName()) : ""),
               RemoteExceptionHandler.checkIOException(ex));
           if (!checkFileSystem()) {
             break;
@@ -268,25 +219,28 @@
 
         } catch (Exception ex) {
           LOG.error("Compaction failed" +
-              (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
-              ex);
+              (r != null ? (" for region " + r.getRegionName()) : ""), ex);
           if (!checkFileSystem()) {
             break;
           }
         }
       }
+      regionsInQueue.clear();
+      compactionQueue.clear();
       LOG.info(getName() + " exiting");
     }
     
     /**
-     * @param e QueueEntry for region to be compacted
+     * @param r HRegion store belongs to
      */
-    public void compactionRequested(QueueEntry e) {
-      compactionQueue.add(e);
-    }
-    
-    void compactionRequested(final HRegion r) {
-      compactionRequested(new QueueEntry(r, System.currentTimeMillis()));
+    public void compactionRequested(HRegion r) {
+      LOG.debug("Compaction requested for region: " + r.getRegionName());
+      synchronized (regionsInQueue) {
+        if (!regionsInQueue.contains(r)) {
+          compactionQueue.add(r);
+          regionsInQueue.add(r);
+        }
+      }
     }
     
     private void split(final HRegion region) throws IOException {
@@ -380,20 +334,32 @@
         lock.writeLock().unlock();
       }
     }
+
+    /**
+     * Only interrupt once it's done with a run through the work loop.
+     */ 
+    public void interruptIfNecessary() {
+      if (workingLock.tryLock()) {
+        this.interrupt();
+      }
+    }
   }
   
   // Cache flushing  
   final Flusher cacheFlusher;
-  // Needed during shutdown so we send an interrupt after completion of a
-  // flush, not in the midst.
-  final Integer cacheFlusherLock = new Integer(0);
-  
-  /** Flush cache upon request */
-  class Flusher extends Thread implements CacheFlushListener {
-    private final DelayQueue<QueueEntry> flushQueue =
-      new DelayQueue<QueueEntry>();
+  /**
+   * Thread that flushes cache on request
+   * @see FlushRequester
+   */
+  private class Flusher extends Thread implements FlushRequester {
+    private final BlockingQueue<HRegion> flushQueue =
+      new LinkedBlockingQueue<HRegion>();
 
+    private final HashSet<HRegion> regionsInQueue = new HashSet<HRegion>();
+    private final ReentrantLock workingLock = new ReentrantLock();
     private final long optionalFlushPeriod;
+    private final long globalMemcacheLimit;
+    private final long globalMemcacheLimitLowMark;
     
     /** constructor */
     public Flusher() {
@@ -401,56 +367,86 @@
       this.optionalFlushPeriod = conf.getLong(
         "hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L);
 
+      // default memcache limit of 512MB
+      globalMemcacheLimit = 
+        conf.getLong("hbase.regionserver.globalMemcacheLimit", 512 * 1024 * 1024);
+      // default memcache low mark limit of 256MB, which is half the upper limit
+      globalMemcacheLimitLowMark = 
+        conf.getLong("hbase.regionserver.globalMemcacheLimitLowMark", 
+          globalMemcacheLimit / 2);        
     }
     
     /** {@inheritDoc} */
     @Override
     public void run() {
       while (!stopRequested.get()) {
-        QueueEntry e = null;
+        HRegion r = null;
         try {
-          e = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
-          if (e == null) {
+          enqueueOptionalFlushRegions();
+          r = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+          if (r == null) {
             continue;
           }
-          synchronized(cacheFlusherLock) { // Don't interrupt while we're working
-            if (e.getRegion().flushcache()) {
-              compactSplitThread.compactionRequested(e);
-            }
-              
-            e.setExpirationTime(System.currentTimeMillis() +
-                optionalFlushPeriod);
-            flushQueue.add(e);
-          }
-          
-          // Now insure that all the active regions are in the queue
-          
-          Set<HRegion> regions = getRegionsToCheck();
-          for (HRegion r: regions) {
-            e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod);
-            synchronized (flushQueue) {
-              if (!flushQueue.contains(e)) {
-                flushQueue.add(e);
-              }
-            }
-          }
-
-          // Now make sure that the queue only contains active regions
-
-          synchronized (flushQueue) {
-            for (Iterator<QueueEntry> i = flushQueue.iterator(); i.hasNext();  ) {
-              e = i.next();
-              if (!regions.contains(e.getRegion())) {
-                i.remove();
-              }
-            }
+          if (!flushRegion(r, false)) {
+            break;
           }
         } catch (InterruptedException ex) {
           continue;
-
         } catch (ConcurrentModificationException ex) {
           continue;
-
+        } catch (Exception ex) {
+          LOG.error("Cache flush failed" +
+              (r != null ? (" for region " + r.getRegionName()) : ""), ex);
+          if (!checkFileSystem()) {
+            break;
+          }
+        }
+      }
+      regionsInQueue.clear();
+      flushQueue.clear();
+      LOG.info(getName() + " exiting");
+    }
+    
+    /**
+     * Flush a region right away, while respecting concurrency with the async
+     * flushing that is always going on.
+     * 
+     * @param region the region to be flushed
+     * @param removeFromQueue true if the region needs to be removed from the
+     * flush queue. False if called from the main run loop and true if called from
+     * flushSomeRegions to relieve memory pressure from the region server.
+     * 
+     * <p>In the main run loop, regions have already been removed from the flush
+     * queue, and if this method is called for the relief of memory pressure,
+     * this may not be necessarily true. We want to avoid trying to remove 
+     * region from the queue because if it has already been removed, it reqires a
+     * sequential scan of the queue to determine that it is not in the queue.
+     * 
+     * <p>If called from flushSomeRegions, the region may be in the queue but
+     * it may have been determined that the region had a significant amout of 
+     * memory in use and needed to be flushed to relieve memory pressure. In this
+     * case, its flush may preempt the pending request in the queue, and if so,
+     * it needs to be removed from the queue to avoid flushing the region multiple
+     * times.
+     * 
+     * @return true if the region was successfully flushed, false otherwise. If 
+     * false, there will be accompanying log messages explaining why the log was
+     * not flushed.
+     */
+    private boolean flushRegion(HRegion region, boolean removeFromQueue) {
+      synchronized (regionsInQueue) {
+        // take the region out of the set. If removeFromQueue is true, remove it
+        // from the queue too if it is there. This didn't used to be a constraint,
+        // but now that HBASE-512 is in play, we need to try and limit
+        // double-flushing of regions.
+        if (regionsInQueue.remove(region) && removeFromQueue) {
+          flushQueue.remove(region);
+        }
+        workingLock.lock();
+        try {
+          if (region.flushcache()) {
+            compactSplitThread.compactionRequested(region);
+          }
         } catch (DroppedSnapshotException ex) {
           // Cache flush can fail in a few places.  If it fails in a critical
           // section, we get a DroppedSnapshotException and a replay of hlog
@@ -458,42 +454,122 @@
           // the server.
           LOG.fatal("Replay of hlog required. Forcing server restart", ex);
           if (!checkFileSystem()) {
-            break;
+            return false;
           }
-          HRegionServer.this.stop();
-
+          server.stop();
+          return false;
         } catch (IOException ex) {
           LOG.error("Cache flush failed" +
-              (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
+              (region != null ? (" for region " + region.getRegionName()) : ""),
               RemoteExceptionHandler.checkIOException(ex));
           if (!checkFileSystem()) {
-            break;
-          }
-
-        } catch (Exception ex) {
-          LOG.error("Cache flush failed" +
-              (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
-              ex);
-          if (!checkFileSystem()) {
-            break;
+            return false;
           }
+        } finally {
+          workingLock.unlock();
         }
       }
-      flushQueue.clear();
-      LOG.info(getName() + " exiting");
+      return true;
     }
     
+    /**
+     * Find the regions that should be optionally flushed and put them on the
+     * flush queue.
+     */
+    private void enqueueOptionalFlushRegions() {
+      long now = System.currentTimeMillis();
+      // Queue up regions for optional flush if they need it
+      for (HRegion region: getRegionsToCheck()) {
+        optionallyAddRegion(region, now);
+      }
+    }
+    
+    /*
+     * Add region if not already added and if optional flush period has been
+     * exceeded.
+     * @param r Region to add.
+     * @param now The 'now' to use.  Set last flush time to this value.
+     */
+    private void optionallyAddRegion(final HRegion r, final long now) {
+      synchronized (regionsInQueue) {
+        if (!regionsInQueue.contains(r) &&
+            (System.currentTimeMillis() - optionalFlushPeriod) >
+              r.getLastFlushTime()) {
+          addRegion(r, now);
+        }
+      }
+    }
+    
+    /*
+     * Add region if not already added.
+     * @param r Region to add.
+     * @param now The 'now' to use.  Set last flush time to this value.
+     */
+    private void addRegion(final HRegion r, final long now) {
+      synchronized (regionsInQueue) {
+        if (!regionsInQueue.contains(r)) {
+          regionsInQueue.add(r);
+          flushQueue.add(r);
+          r.setLastFlushTime(now);
+        }
+      }
+    }
+
     /** {@inheritDoc} */
-    public void flushRequested(HRegion region) {
-      if (region == null) {
-          return;
+    public void request(HRegion r) {
+      addRegion(r, System.currentTimeMillis());
+    }
+    
+    /**
+     * Check if the regionserver's memcache memory usage is greater than the 
+     * limit. If so, flush regions with the biggest memcaches until we're down
+     * to the lower limit. This method blocks callers until we're down to a safe
+     * amount of memcache consumption.
+     */
+    public synchronized void reclaimMemcacheMemory() {
+      long globalMemory = getGlobalMemcacheSize();
+      if (globalMemory >= globalMemcacheLimit) {
+        LOG.info("Global cache memory in use " + globalMemory + " >= " +
+            globalMemcacheLimit + " configured maximum." +
+        " Forcing cache flushes to relieve memory pressure.");
+        flushSomeRegions();
+      }
+    }
+    
+    private void flushSomeRegions() {
+      // we'll sort the regions in reverse
+      SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
+          new Comparator<Long>() {
+            public int compare(Long a, Long b) {
+              return -1 * a.compareTo(b);
+            }
+          }
+      );
+      
+      // copy over all the regions
+      for (HRegion region : getRegionsToCheck()) {
+        sortedRegions.put(region.memcacheSize.get(), region);
       }
-      QueueEntry e = new QueueEntry(region, System.currentTimeMillis());
-      synchronized (flushQueue) {
-        if (flushQueue.contains(e)) {
-          flushQueue.remove(e);
+      
+      // keep flushing until we hit the low water mark
+      while (getGlobalMemcacheSize() >= globalMemcacheLimitLowMark) {
+        // flush the region with the biggest memcache
+        HRegion biggestMemcacheRegion = 
+          sortedRegions.remove(sortedRegions.firstKey());
+        LOG.info("Force flush of region " + biggestMemcacheRegion.getRegionName());
+        if (!flushRegion(biggestMemcacheRegion, true)) {
+          // Something bad happened - give up.
+          break;
         }
-        flushQueue.add(e);
+      }
+    }
+
+    /**
+     * Only interrupt once it's done with a run through the work loop.
+     */ 
+    void interruptIfNecessary() {
+      if (workingLock.tryLock()) {
+        this.interrupt();
       }
     }
   }
@@ -505,7 +581,7 @@
   final Integer logRollerLock = new Integer(0);
   
   /** Runs periodically to determine if the HLog should be rolled */
-  class LogRoller extends Thread implements LogRollListener {
+  private class LogRoller extends Thread implements LogRollListener {
     private final Integer rollLock = new Integer(0);
     private volatile boolean rollLog;
     
@@ -784,12 +860,9 @@
 
     // Send interrupts to wake up threads if sleeping so they notice shutdown.
     // TODO: Should we check they are alive?  If OOME could have exited already
-    synchronized(cacheFlusherLock) {
-      this.cacheFlusher.interrupt();
-    }
-    synchronized (compactSplitLock) {
-      this.compactSplitThread.interrupt();
-    }
+    this.cacheFlusher.interruptIfNecessary();
+    this.compactSplitThread.interruptIfNecessary();
+
     synchronized (logRollerLock) {
       this.logRoller.interrupt();
     }
@@ -1467,6 +1540,7 @@
     this.requestCount.incrementAndGet();
     HRegion region = getRegion(regionName);
     try {
+      cacheFlusher.reclaimMemcacheMemory();
       region.batchUpdate(timestamp, b);
     } catch (IOException e) {
       checkFileSystem();
@@ -1616,8 +1690,8 @@
     return this.requestCount;
   }
 
-  /** @return reference to CacheFlushListener */
-  public CacheFlushListener getCacheFlushListener() {
+  /** @return reference to FlushRequester */
+  public FlushRequester getFlushRequester() {
     return this.cacheFlusher;
   }
   
@@ -1706,12 +1780,8 @@
    */
   protected Set<HRegion> getRegionsToCheck() {
     HashSet<HRegion> regionsToCheck = new HashSet<HRegion>();
-    //TODO: is this locking necessary? 
-    lock.readLock().lock();
-    try {
+    synchronized (this.onlineRegions) {
       regionsToCheck.addAll(this.onlineRegions.values());
-    } finally {
-      lock.readLock().unlock();
     }
     // Purge closed regions.
     for (final Iterator<HRegion> i = regionsToCheck.iterator(); i.hasNext();) {
@@ -1738,6 +1808,18 @@
    */
   protected List<HMsg> getOutboundMsgs() {
     return this.outboundMsgs;
+  }
+  
+  /**
+   * Return the total size of all memcaches in every region.
+   * @return memcache size in bytes
+   */
+  long getGlobalMemcacheSize() {
+    long total = 0;
+    for (HRegion region : getRegionsToCheck()) {
+      total += region.memcacheSize.get();
+    }
+    return total;
   }
   
   //

Modified: hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java?rev=651067&r1=651066&r2=651067&view=diff
==============================================================================
--- hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (original)
+++ hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java Wed
Apr 23 14:20:42 2008
@@ -114,6 +114,16 @@
     }
   }
 
+  /**
+   * @param serverNumber
+   * @return region server
+   */
+  public HRegionServer getRegionServer(int serverNumber) {
+    synchronized (regionThreads) {
+      return regionThreads.get(serverNumber).getRegionServer();
+    }
+  }
+
   /** runs region servers */
   public static class RegionServerThread extends Thread {
     private final HRegionServer regionServer;

Modified: hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=651067&r1=651066&r2=651067&view=diff
==============================================================================
--- hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java Wed Apr
23 14:20:42 2008
@@ -276,4 +276,13 @@
   public List<LocalHBaseCluster.RegionServerThread> getRegionThreads() {
     return this.hbaseCluster.getRegionServers();
   }
+
+  /**
+   * Grab a numbered region server of your choice.
+   * @param serverNumber
+   * @return region server
+   */
+  public HRegionServer getRegionServer(int serverNumber) {
+    return hbaseCluster.getRegionServer(serverNumber);
+  }
 }

Modified: hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/MultiRegionTable.java?rev=651067&r1=651066&r2=651067&view=diff
==============================================================================
--- hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/MultiRegionTable.java (original)
+++ hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/MultiRegionTable.java Wed Apr
23 14:20:42 2008
@@ -104,8 +104,8 @@
     }
 
     // Flush the cache
-    cluster.getRegionThreads().get(0).getRegionServer().getCacheFlushListener().
-      flushRequested(r);
+    cluster.getRegionThreads().get(0).getRegionServer().getFlushRequester().
+      request(r);
 
     // Now, wait until split makes it into the meta table.
     int oldCount = count;

Added: hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/TestGlobalMemcacheLimit.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/TestGlobalMemcacheLimit.java?rev=651067&view=auto
==============================================================================
--- hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/TestGlobalMemcacheLimit.java
(added)
+++ hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/TestGlobalMemcacheLimit.java
Wed Apr 23 14:20:42 2008
@@ -0,0 +1,142 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Test setting the global memcache size for a region server. When it reaches 
+ * this size, any puts should be blocked while one or more forced flushes occurs
+ * to bring the memcache size back down. 
+ */
+public class TestGlobalMemcacheLimit extends HBaseClusterTestCase {
+  final byte[] ONE_KB = new byte[1024];
+
+  HTable table1;
+  HTable table2;
+  HRegionServer server;
+  
+  long keySize = (new Text(COLFAMILY_NAME1)).getLength() + 9 + 8;
+  long rowSize = keySize + ONE_KB.length;
+
+  /** {@inheritDoc} */
+  @Override
+  public void setUp() throws Exception {
+    preHBaseClusterSetup();
+    super.setUp();
+    postHBaseClusterSetup();
+  }
+  
+  /**
+   * Get our hands into the cluster configuration before the hbase cluster 
+   * starts up.
+   */
+  private void preHBaseClusterSetup() {
+    // we'll use a 2MB global memcache for testing's sake.
+    conf.setInt("hbase.regionserver.globalMemcacheLimit", 2 * 1024 * 1024);
+    // low memcache mark will be 1MB
+    conf.setInt("hbase.regionserver.globalMemcacheLimitLowMark", 
+      1 * 1024 * 1024);
+    // make sure we don't do any optional flushes and confuse my tests.
+    conf.setInt("hbase.regionserver.optionalcacheflushinterval", 120000);
+  }
+  
+  /**
+   * Create a table that we'll use to test.
+   */
+  private void postHBaseClusterSetup() throws IOException {
+    HTableDescriptor desc1 = createTableDescriptor("testTable1");
+    HTableDescriptor desc2 = createTableDescriptor("testTable2");
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    admin.createTable(desc1);
+    admin.createTable(desc2);
+    table1 = new HTable(conf, new Text("testTable1"));
+    table2 = new HTable(conf, new Text("testTable2"));    
+    server = cluster.getRegionServer(0);    
+    
+    // there is a META region in play, and those are probably still in
+    // the memcache for ROOT. flush it out.
+    for (HRegion region : server.getOnlineRegions().values()) {
+      region.flushcache();
+    }
+    // make sure we're starting at 0 so that it's easy to predict what the 
+    // results of our tests should be.
+    assertEquals("Starting memcache size", 0, server.getGlobalMemcacheSize());
+  }
+  
+  /**
+   * Make sure that region server thinks all the memcaches are as big as we were
+   * hoping they would be.
+   * @throws IOException
+   */
+  public void testMemcacheSizeAccounting() throws IOException {
+    // put some data in each of the two tables
+    long dataSize = populate(table1, 500, 0) + populate(table2, 500, 0);
+    
+    // make sure the region server says it is using as much memory as we think
+    // it is.
+    assertEquals("Global memcache size", dataSize, 
+      server.getGlobalMemcacheSize());
+  }
+  
+  /**
+   * Test that a put gets blocked and a flush is forced as expected when we 
+   * reach the memcache size limit.
+   * @throws IOException
+   */
+  public void testBlocksAndForcesFlush() throws IOException {
+    // put some data in each of the two tables
+    long startingDataSize = populate(table1, 500, 0) + populate(table2, 500, 0);
+    
+    // at this point we have 1052000 bytes in memcache. now, we'll keep adding 
+    // data to one of the tables until just before the global memcache limit,
+    // noting that the globalMemcacheSize keeps growing as expected. then, we'll
+    // do another put, causing it to go over the limit. when we look at the
+    // globablMemcacheSize now, it should be <= the low limit. 
+    long dataNeeded = (2 * 1024 * 1024) - startingDataSize;
+    double numRows = (double)dataNeeded / (double)rowSize;
+    int preFlushRows = (int)Math.floor(numRows);
+  
+    long dataAdded = populate(table1, preFlushRows, 500);
+    assertEquals("Expected memcache size", dataAdded + startingDataSize, 
+      server.getGlobalMemcacheSize());
+        
+    populate(table1, 2, preFlushRows + 500);
+    assertTrue("Post-flush memcache size", server.getGlobalMemcacheSize() <= 1024 * 1024);
+  }
+  
+  private long populate(HTable table, int numRows, int startKey) throws IOException {
+    long total = 0;
+    Text column = new Text(COLFAMILY_NAME1);
+    for (int i = startKey; i < startKey + numRows; i++) {
+      Text key = new Text("row_" + String.format("%1$5d", i));
+      total += key.getLength();
+      total += column.getLength();
+      total += 8;
+      total += ONE_KB.length;
+      long id = table.startUpdate(key);
+      table.put(id, column, ONE_KB);
+      table.commit(id);
+    }
+    return total;
+  }
+}



Mime
View raw message