hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From la...@apache.org
Subject svn commit: r1483022 - in /hbase/branches/0.94: security/src/test/resources/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/resources/ src/test/java/org/apache/hadoop/hbase/regionserver/ src/test/java/org/apache/hadoop/hbase/regionserver/...
Date Wed, 15 May 2013 19:23:45 GMT
Author: larsh
Date: Wed May 15 19:23:45 2013
New Revision: 1483022

URL: http://svn.apache.org/r1483022
Log:
HBASE-5930 Limits the amount of time an edit can live in the memstore. (Davaraj and LarsH)

Modified:
    hbase/branches/0.94/security/src/test/resources/hbase-site.xml
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.94/src/main/resources/hbase-default.xml
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
    hbase/branches/0.94/src/test/resources/hbase-site.xml

Modified: hbase/branches/0.94/security/src/test/resources/hbase-site.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/security/src/test/resources/hbase-site.xml?rev=1483022&r1=1483021&r2=1483022&view=diff
==============================================================================
--- hbase/branches/0.94/security/src/test/resources/hbase-site.xml (original)
+++ hbase/branches/0.94/security/src/test/resources/hbase-site.xml Wed May 15 19:23:45 2013
@@ -97,14 +97,6 @@
     </description>
   </property>
   <property>
-    <name>hbase.regionserver.optionalcacheflushinterval</name>
-    <value>1000</value>
-    <description>
-    Amount of time to wait since the last time a region was flushed before
-    invoking an optional cache flush. Default 60,000.
-    </description>
-  </property>
-  <property>
     <name>hbase.regionserver.safemode</name>
     <value>false</value>
     <description>

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java?rev=1483022&r1=1483021&r2=1483022&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
Wed May 15 19:23:45 2013
@@ -30,4 +30,12 @@ public interface FlushRequester {
    * @param region the HRegion requesting the cache flush
    */
   void requestFlush(HRegion region);
+
+  /**
+   * Tell the listener the cache needs to be flushed after a delay
+   *
+   * @param region the HRegion requesting the cache flush
+   * @param delay after how much time should the flush happen
+   */
+  void requestDelayedFlush(HRegion region, long delay);
 }
\ No newline at end of file

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1483022&r1=1483021&r2=1483022&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed
May 15 19:23:45 2013
@@ -345,6 +345,7 @@ public class HRegion implements HeapSize
   final RegionServerServices rsServices;
   private RegionServerAccounting rsAccounting;
   private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
+  private long flushCheckInterval;
   private long blockingMemStoreSize;
   final long threadWakeFrequency;
   // Used to guard closes
@@ -452,6 +453,8 @@ public class HRegion implements HeapSize
     else {
       this.conf = new CompoundConfiguration().add(confParam);
     }
+    this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
+        DEFAULT_CACHE_FLUSH_INTERVAL);
     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
                     DEFAULT_ROWLOCK_WAIT_DURATION);
 
@@ -953,6 +956,12 @@ public class HRegion implements HeapSize
 
   private final Object closeLock = new Object();
 
+  /** Conf key for the periodic flush interval */
+  public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL = 
+      "hbase.regionserver.optionalcacheflushinterval";
+  /** Default interval for the memstore flush */
+  public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
+
   /**
    * Close down this HRegion.  Flush the cache unless abort parameter is true,
    * Shut down each HStore, don't service any more calls.
@@ -1475,6 +1484,29 @@ public class HRegion implements HeapSize
   }
 
   /**
+   * Should the memstore be flushed now
+   */
+  boolean shouldFlush() {
+    if (flushCheckInterval <= 0) { //disabled
+      return false;
+    }
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+    //if we flushed in the recent past, we don't need to do again now
+    if ((now - getLastFlushTime() < flushCheckInterval)) {
+      return false;
+    }
+    //since we didn't flush in the recent past, flush now if certain conditions
+    //are met. Return true on first such memstore hit.
+    for (Store s : this.getStores().values()) {
+      if (s.timeOfOldestEdit() < now - flushCheckInterval) {
+        // we have an old enough edit in the memstore, flush
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * Flush the memstore.
    *
    * Flushing the memstore is a little tricky. We have a lot of updates in the
@@ -5421,7 +5453,7 @@ public class HRegion implements HeapSize
       ClassSize.OBJECT +
       ClassSize.ARRAY +
       36 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
-      (7 * Bytes.SIZEOF_LONG) +
+      (8 * Bytes.SIZEOF_LONG) +
       Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1483022&r1=1483021&r2=1483022&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Wed May 15 19:23:45 2013
@@ -31,7 +31,6 @@ import java.lang.reflect.Method;
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -210,7 +209,7 @@ public class HRegionServer implements HR
   private HFileSystem fs;
   private boolean useHBaseChecksum; // verify hbase checksums?
   private Path rootDir;
-  private final Random rand = new Random();
+  private final Random rand;
 
   //RegionName vs current action in progress
   //true - if open region action in progress
@@ -291,6 +290,11 @@ public class HRegionServer implements HR
    */
   Chore compactionChecker;
 
+  /*
+   * Check for flushes
+   */
+  Chore periodicFlusher;
+
   // HLog and HLog roller. log is protected rather than private to avoid
   // eclipse warning when accessed by inner classes
   protected volatile HLog hlog;
@@ -443,6 +447,8 @@ public class HRegionServer implements HR
     if (initialIsa.getAddress() == null) {
       throw new IllegalArgumentException("Failed resolve of " + initialIsa);
     }
+
+    this.rand = new Random(initialIsa.hashCode());
     this.rpcServer = HBaseRPC.getServer(this,
       new Class<?>[]{HRegionInterface.class, HBaseRPCErrorHandler.class,
         OnlineRegions.class},
@@ -697,6 +703,8 @@ public class HRegionServer implements HR
     this.compactionChecker = new CompactionChecker(this,
       this.threadWakeFrequency * multiplier, this);
 
+    this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
+
     // Health checker thread.
     int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
       HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
@@ -1348,6 +1356,36 @@ public class HRegionServer implements HR
     }
   }
 
+  class PeriodicMemstoreFlusher extends Chore {
+    final HRegionServer server;
+    final static int RANGE_OF_DELAY = 20000; //millisec
+    final static int MIN_DELAY_TIME = 3000; //millisec
+    public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
+      super(server.getServerName() + "-MemstoreFlusherChore", cacheFlushInterval, server);
+      this.server = server;
+    }
+
+    @Override
+    protected void chore() {
+      for (HRegion r : this.server.onlineRegions.values()) {
+        if (r == null)
+          continue;
+        if (r.shouldFlush()) {
+          FlushRequester requester = server.getFlushRequester();
+          if (requester != null) {
+            long randomDelay = rand.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
+            LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString()
+ 
+                " after a delay of " + randomDelay);
+            //Throttle the flushes by putting a delay. If we don't throttle, and there
+            //is a balanced write-load on the regions in a table, we might end up 
+            //overwhelming the filesystem with too many flushes at once.
+            requester.requestDelayedFlush(r, randomDelay);
+          }
+        }
+      }
+    }
+  }
+
   /**
    * Report the status of the server. A server is online once all the startup is
    * completed (setting up filesystem, starting service threads, etc.). This
@@ -1659,6 +1697,8 @@ public class HRegionServer implements HR
         uncaughtExceptionHandler);
     Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
       ".compactionChecker", uncaughtExceptionHandler);
+    Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), n +
+        ".periodicFlusher", uncaughtExceptionHandler);
     if (this.healthCheckChore != null) {
       Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker",
           uncaughtExceptionHandler);
@@ -1739,7 +1779,8 @@ public class HRegionServer implements HR
     // Verify that all threads are alive
     if (!(leases.isAlive()
         && cacheFlusher.isAlive() && hlogRoller.isAlive()
-        && this.compactionChecker.isAlive())) {
+        && this.compactionChecker.isAlive())
+        && this.periodicFlusher.isAlive()) {
       stop("One or more threads are no longer alive -- stop");
       return false;
     }
@@ -1916,6 +1957,7 @@ public class HRegionServer implements HR
    */
   protected void join() {
     Threads.shutdown(this.compactionChecker.getThread());
+    Threads.shutdown(this.periodicFlusher.getThread());
     Threads.shutdown(this.cacheFlusher.getThread());
     if (this.healthCheckChore != null) {
       Threads.shutdown(this.healthCheckChore.getThread());

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1483022&r1=1483021&r2=1483022&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Wed
May 15 19:23:45 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.HeapSi
 import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
  * The MemStore holds in-memory modifications to the Store.  Modifications
@@ -87,6 +88,9 @@ public class MemStore implements HeapSiz
   // Used to track own heapSize
   final AtomicLong size;
 
+  // Used to track when to flush
+  volatile long timeOfOldestEdit = Long.MAX_VALUE;
+
   TimeRangeTracker timeRangeTracker;
   TimeRangeTracker snapshotTimeRangeTracker;
 
@@ -158,6 +162,7 @@ public class MemStore implements HeapSiz
           if (allocator != null) {
             this.allocator = new MemStoreLAB(conf);
           }
+          timeOfOldestEdit = Long.MAX_VALUE;
         }
       }
     } finally {
@@ -217,6 +222,28 @@ public class MemStore implements HeapSiz
     }
   }
 
+  long timeOfOldestEdit() {
+    return timeOfOldestEdit;
+  }
+
+  private boolean addToKVSet(KeyValue e) {
+    boolean b = this.kvset.add(e);
+    setOldestEditTimeToNow();
+    return b;
+  }
+
+  private boolean removeFromKVSet(KeyValue e) {
+    boolean b = this.kvset.remove(e);
+    setOldestEditTimeToNow();
+    return b;
+  }
+
+  void setOldestEditTimeToNow() {
+    if (timeOfOldestEdit == Long.MAX_VALUE) {
+      timeOfOldestEdit = EnvironmentEdgeManager.currentTimeMillis();
+    }
+  }
+
   /**
    * Internal version of add() that doesn't clone KVs with the
    * allocator, and doesn't take the lock.
@@ -224,7 +251,7 @@ public class MemStore implements HeapSiz
    * Callers should ensure they already have the read lock taken
    */
   private long internalAdd(final KeyValue toAdd) {
-    long s = heapSizeChange(toAdd, this.kvset.add(toAdd));
+    long s = heapSizeChange(toAdd, addToKVSet(toAdd));
     timeRangeTracker.includeTimestamp(toAdd);
     this.size.addAndGet(s);
     return s;
@@ -272,7 +299,7 @@ public class MemStore implements HeapSiz
       // If the key is in the memstore, delete it. Update this.size.
       found = this.kvset.get(kv);
       if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
-        this.kvset.remove(kv);
+        removeFromKVSet(kv);
         long s = heapSizeChange(kv, true);
         this.size.addAndGet(-s);
       }
@@ -291,7 +318,7 @@ public class MemStore implements HeapSiz
     this.lock.readLock().lock();
     try {
       KeyValue toAdd = maybeCloneWithAllocator(delete);
-      s += heapSizeChange(toAdd, this.kvset.add(toAdd));
+      s += heapSizeChange(toAdd, addToKVSet(toAdd));
       timeRangeTracker.includeTimestamp(toAdd);
     } finally {
       this.lock.readLock().unlock();
@@ -588,6 +615,7 @@ public class MemStore implements HeapSiz
           addedSize -= delta;
           this.size.addAndGet(-delta);
           it.remove();
+          setOldestEditTimeToNow();
         }
       } else {
         // past the column, done
@@ -899,7 +927,7 @@ public class MemStore implements HeapSiz
   }
 
   public final static long FIXED_OVERHEAD = ClassSize.align(
-      ClassSize.OBJECT + (11 * ClassSize.REFERENCE));
+      ClassSize.OBJECT + (11 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
 
   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1483022&r1=1483021&r2=1483022&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
Wed May 15 19:23:45 2013
@@ -322,6 +322,18 @@ class MemStoreFlusher extends HasThread 
     }
   }
 
+  public void requestDelayedFlush(HRegion r, long delay) {
+    synchronized (regionsInQueue) {
+      if (!regionsInQueue.containsKey(r)) {
+        // This entry has some delay
+        FlushRegionEntry fqe = new FlushRegionEntry(r);
+        fqe.requeue(delay);
+        this.regionsInQueue.put(r, fqe);
+        this.flushQueue.add(fqe);
+      }
+    }
+  }
+
   public int getFlushQueueSize() {
     return flushQueue.size();
   }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1483022&r1=1483021&r2=1483022&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Wed
May 15 19:23:45 2013
@@ -513,6 +513,13 @@ public class Store extends SchemaConfigu
   }
 
   /**
+   * When was the oldest edit done in the memstore
+   */
+  public long timeOfOldestEdit() {
+    return memstore.timeOfOldestEdit();
+  }
+
+  /**
    * Adds a value to the memstore
    *
    * @param kv

Modified: hbase/branches/0.94/src/main/resources/hbase-default.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/resources/hbase-default.xml?rev=1483022&r1=1483021&r2=1483022&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/resources/hbase-default.xml (original)
+++ hbase/branches/0.94/src/main/resources/hbase-default.xml Wed May 15 19:23:45 2013
@@ -353,6 +353,14 @@
     </description>
   </property>
   <property>
+    <name>hbase.regionserver.optionalcacheflushinterval</name>
+    <value>3600000</value>
+    <description>
+    Maximum amount of time an edit lives in memory before being automatically flushed.
+    Default 1 hour. Set it to 0 to disable automatic flushing.
+    </description>
+  </property>
+  <property>
     <name>hbase.hregion.memstore.flush.size</name>
     <value>134217728</value>
     <description>

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=1483022&r1=1483021&r2=1483022&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
(original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
Wed May 15 19:23:45 2013
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import junit.framework.TestCase;
@@ -39,6 +40,8 @@ import org.apache.hadoop.hbase.client.Sc
 import org.apache.hadoop.hbase.regionserver.Store.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Iterables;
@@ -870,6 +873,99 @@ public class TestMemStore extends TestCa
     assertEquals(newSize, this.memstore.size.get());
   }
 
+  ////////////////////////////////////
+  // Test for periodic memstore flushes 
+  // based on time of oldest edit
+  ////////////////////////////////////
+
+  /**
+   * Tests that the timeOfOldestEdit is updated correctly for the 
+   * various edit operations in memstore.
+   * @throws Exception
+   */
+  public void testUpdateToTimeOfOldestEdit() throws Exception {
+    try {
+      EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
+      EnvironmentEdgeManager.injectEdge(edge);
+      MemStore memstore = new MemStore();
+      long t = memstore.timeOfOldestEdit();
+      assertEquals(t, Long.MAX_VALUE);
+
+      // test the case that the timeOfOldestEdit is updated after a KV add
+      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
+      t = memstore.timeOfOldestEdit();
+      assertTrue(t == 1234);
+      // snapshot() will reset timeOfOldestEdit. The method will also assert the 
+      // value is reset to Long.MAX_VALUE
+      t = runSnapshot(memstore);
+
+      // test the case that the timeOfOldestEdit is updated after a KV delete
+      memstore.delete(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
+      t = memstore.timeOfOldestEdit();
+      assertTrue(t == 1234);
+      t = runSnapshot(memstore);
+
+      // test the case that the timeOfOldestEdit is updated after a KV upsert
+      List<KeyValue> l = new ArrayList<KeyValue>();
+      KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
+      l.add(kv1);
+      memstore.upsert(l);
+      t = memstore.timeOfOldestEdit();
+      assertTrue(t == 1234);
+    } finally {
+      EnvironmentEdgeManager.reset();
+    }
+  }
+
+  /**
+   * Tests the HRegion.shouldFlush method - adds an edit in the memstore
+   * and checks that shouldFlush returns true, and another where it disables
+   * the periodic flush functionality and tests whether shouldFlush returns
+   * false. 
+   * @throws Exception
+   */
+  public void testShouldFlush() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
+    checkShouldFlush(conf, true);
+    // test disable flush
+    conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0);
+    checkShouldFlush(conf, false);
+  }
+
+  private void checkShouldFlush(Configuration conf, boolean expected) throws Exception {
+    try {
+      EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
+      EnvironmentEdgeManager.injectEdge(edge);
+      HBaseTestingUtility hbaseUtility = new HBaseTestingUtility(conf);
+      HRegion region = hbaseUtility.createTestRegion("foobar", new HColumnDescriptor("foo"));
+
+      Map<byte[], Store> stores = region.getStores();
+      assertTrue(stores.size() == 1);
+
+      Store s = stores.entrySet().iterator().next().getValue();
+      edge.setCurrentTimeMillis(1234);
+      s.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
+      edge.setCurrentTimeMillis(1234 + 100);
+      assertTrue(region.shouldFlush() == false);
+      edge.setCurrentTimeMillis(1234 + 10000);
+      assertTrue(region.shouldFlush() == expected);
+    } finally {
+      EnvironmentEdgeManager.reset();
+    }
+  }
+
+  private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {
+    long t = 1234;
+    @Override
+    public long currentTimeMillis() {
+      return t; 
+    }
+    public void setCurrentTimeMillis(long t) {
+      this.t = t;
+    }
+  }
+
   /**   * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT}
    * @param hmc Instance to add rows to.
    * @return How many rows we added.
@@ -898,14 +994,17 @@ public class TestMemStore extends TestCa
     return ROW_COUNT;
   }
 
-  private void runSnapshot(final MemStore hmc) throws UnexpectedException {
+  private long runSnapshot(final MemStore hmc) throws UnexpectedException {
     // Save off old state.
     int oldHistorySize = hmc.getSnapshot().size();
     hmc.snapshot();
     KeyValueSkipListSet ss = hmc.getSnapshot();
     // Make some assertions about what just happened.
     assertTrue("History size has not increased", oldHistorySize < ss.size());
+    long t = memstore.timeOfOldestEdit();
+    assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
     hmc.clearSnapshot(ss);
+    return t;
   }
 
   private void isExpectedRowWithoutTimestamps(final int rowIndex,

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1483022&r1=1483021&r2=1483022&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
(original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
Wed May 15 19:23:45 2013
@@ -762,6 +762,12 @@ public class TestWALReplay {
         throw new RuntimeException("Exception flushing", e);
       }
     }
+
+    @Override
+    public void requestDelayedFlush(HRegion region, long when) {
+      // TODO Auto-generated method stub
+      
+    }
   }
 
   private void addWALEdits (final byte [] tableName, final HRegionInfo hri,

Modified: hbase/branches/0.94/src/test/resources/hbase-site.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/resources/hbase-site.xml?rev=1483022&r1=1483021&r2=1483022&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/resources/hbase-site.xml (original)
+++ hbase/branches/0.94/src/test/resources/hbase-site.xml Wed May 15 19:23:45 2013
@@ -97,14 +97,6 @@
     </description>
   </property>
   <property>
-    <name>hbase.regionserver.optionalcacheflushinterval</name>
-    <value>1000</value>
-    <description>
-    Amount of time to wait since the last time a region was flushed before
-    invoking an optional cache flush. Default 60,000.
-    </description>
-  </property>
-  <property>
     <name>hbase.regionserver.safemode</name>
     <value>false</value>
     <description>



Mime
View raw message