zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1430795 - in /zookeeper/bookkeeper/trunk: ./ doc/ hedwig-server/conf/ hedwig-server/src/main/java/org/apache/hedwig/server/common/ hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ hedwig-server/src/test/java/org/apache/hed...
Date Wed, 09 Jan 2013 11:29:39 GMT
Author: ivank
Date: Wed Jan  9 11:29:39 2013
New Revision: 1430795

URL: http://svn.apache.org/viewvc?rev=1430795&view=rev
Log:
BOOKKEEPER-531: Cache thread should wait until old entries are collected (sijie via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/doc/hedwigParams.textile
    zookeeper/bookkeeper/trunk/hedwig-server/conf/hw_server.conf
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1430795&r1=1430794&r2=1430795&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Jan  9 11:29:39 2013
@@ -220,6 +220,8 @@ Trunk (unreleased changes)
 
 	BOOKKEEPER-532: AbstractSubscriptionManager#AcquireOp read subscriptions every time even
it already owned the topic. (sijie via fpj)
 
+        BOOKKEEPER-531: Cache thread should wait until old entries are collected (sijie via
ivank)
+
     IMPROVEMENTS:
 
       BOOKKEEPER-467: Allocate ports for testing dynamically (ivank)

Modified: zookeeper/bookkeeper/trunk/doc/hedwigParams.textile
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/doc/hedwigParams.textile?rev=1430795&r1=1430794&r2=1430795&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/doc/hedwigParams.textile (original)
+++ zookeeper/bookkeeper/trunk/doc/hedwigParams.textile Wed Jan  9 11:29:39 2013
@@ -43,6 +43,7 @@ h3. Read-ahead cache parameters
 bq. Upon a range scan request for a given topic, two hints are provided as to when scanning
should stop: the number of messages scanned and the total size of messages scanned. Scanning
stops whenever one of these limits is exceeded.
 
 | @cache_size@ | Sets the size of the read-ahead cache. Default is the smallest of 2G or
half the heap size. | 
+| @cache_entry_ttl@ | Sets TTL for cache entries. Each time adding new entry into the cache,
those expired cache entries would be discarded. If the value is set to zero or less than zero,
cache entry will not be evicted until the cache is fullfilled or the messages are already
consumed. Default is 0. |
 | @scan_backoff_ms@ | The backoff time (in milliseconds) to retry scans after failures. Default
value is 1s (1000ms). Default is 1s. |
 | @num_readahead_cache_threads@ | Sets the number of threads to be used for the read-ahead
mechanism. Default is the number of cores as returned with a call to <code>Runtime.getRuntime().availableProcessors()</code>.|
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/conf/hw_server.conf
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/conf/hw_server.conf?rev=1430795&r1=1430794&r2=1430795&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/conf/hw_server.conf (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/conf/hw_server.conf Wed Jan  9 11:29:39 2013
@@ -132,6 +132,13 @@ ssl_enabled=false
 # <code>Runtime.getRuntime().availableProcessors()</code>.
 # num_readahead_cache_threads=
 
+# Set TTL for cache entries. Each time adding new entry into the cache,
+# those expired cache entries would be discarded. If the value is set
+# to zero or less than zero, cache entry will not be evicted until the
+# cache is fullfilled or the messages are already consumed. By default
+# the value is zero.
+# cache_entry_ttl=
+
 ################################
 # Metadata Settings
 ################################

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1430795&r1=1430794&r2=1430795&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
Wed Jan  9 11:29:39 2013
@@ -41,6 +41,7 @@ public class ServerConfiguration extends
     protected final static String READAHEAD_COUNT = "readahead_count";
     protected final static String READAHEAD_SIZE = "readahead_size";
     protected final static String CACHE_SIZE = "cache_size";
+    protected final static String CACHE_ENTRY_TTL = "cache_entry_ttl";
     protected final static String SCAN_BACKOFF_MSEC = "scan_backoff_ms";
     protected final static String SERVER_PORT = "server_port";
     protected final static String SSL_SERVER_PORT = "ssl_server_port";
@@ -175,6 +176,17 @@ public class ServerConfiguration extends
     }
 
     /**
+     * Cache Entry TTL. By default is 0, cache entry will not be evicted
+     * until the cache is fullfilled or the messages are already consumed.
+     * The TTL is only checked when trying adding a new entry into the cache.
+     *
+     * @return cache entry ttl.
+     */
+    public long getCacheEntryTTL() {
+        return conf.getLong(CACHE_ENTRY_TTL, 0L);
+    }
+
+    /**
      * After a scan of a log fails, how long before we retry (in msec)
      * 
      * @return long

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java?rev=1430795&r1=1430794&r2=1430795&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
Wed Jan  9 11:29:39 2013
@@ -34,6 +34,7 @@ import java.util.concurrent.BlockingQueu
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hedwig.protocol.PubSubProtocol;
 import org.slf4j.Logger;
@@ -57,7 +58,7 @@ import org.apache.hedwig.server.jmx.Hedw
 import org.apache.hedwig.server.persistence.ReadAheadCacheBean;
 import org.apache.hedwig.util.Callback;
 
-public class ReadAheadCache implements PersistenceManager, Runnable, HedwigJMXService {
+public class ReadAheadCache implements PersistenceManager, HedwigJMXService {
 
     static Logger logger = LoggerFactory.getLogger(ReadAheadCache.class);
 
@@ -78,24 +79,31 @@ public class ReadAheadCache implements P
         new ConcurrentHashMap<CacheKey, CacheValue>();
 
     /**
-     * To simplify synchronization, the cache will be maintained by a single
-     * cache maintainer thread. This is the queue that will hold requests that
-     * need to be served by this thread
+     * We also want to track the entries in seq-id order so that we can clean up
+     * entries after the last subscriber
      */
-    protected BlockingQueue<CacheRequest> requestQueue = new LinkedBlockingQueue<CacheRequest>();
+    protected ConcurrentMap<ByteString, SortedSet<Long>> orderedIndexOnSeqId
=
+        new ConcurrentHashMap<ByteString, SortedSet<Long>>();
 
     /**
-     * We want to keep track of when entries were added in the cache, so that we
-     * can remove them in a FIFO fashion
+     * Partition Cache into Serveral Segments for simplify synchronization.
+     * Each segment maintains its time index and segment size.
      */
-    protected SortedMap<Long, Set<CacheKey>> timeIndexOfAddition = new TreeMap<Long,
Set<CacheKey>>();
+    static class CacheSegment {
 
-    /**
-     * We also want to track the entries in seq-id order so that we can clean up
-     * entries after the last subscriber
-     */
-    protected Map<ByteString, SortedSet<Long>> orderedIndexOnSeqId =
-        new HashMap<ByteString, SortedSet<Long>>();
+        /**
+         * We want to keep track of when entries were added in the cache, so that we
+         * can remove them in a FIFO fashion
+         */
+        protected SortedMap<Long, Set<CacheKey>> timeIndexOfAddition = new TreeMap<Long,
Set<CacheKey>>();
+
+        /**
+         * We maintain an estimate of the current size of each cache segment,
+         * so that the thread know when to evict entries from cache segment.
+         */
+        protected AtomicLong presentSegmentSize = new AtomicLong(0);
+
+    }
 
     /**
      * We maintain an estimate of the current size of the cache, so that we know
@@ -104,6 +112,22 @@ public class ReadAheadCache implements P
     protected AtomicLong presentCacheSize = new AtomicLong(0);
 
     /**
+     * Num pending requests.
+     */
+    protected AtomicInteger numPendingRequests = new AtomicInteger(0);
+
+    /**
+     * Cache segment for different threads
+     */
+    protected final ThreadLocal<CacheSegment> cacheSegment =
+        new ThreadLocal<CacheSegment>() {
+            @Override
+            protected CacheSegment initialValue() {
+                return new CacheSegment();
+            }
+        };
+
+    /**
      * One instance of a callback that we will pass to the underlying
      * persistence manager when asking it to persist messages
      */
@@ -116,13 +140,14 @@ public class ReadAheadCache implements P
     protected ReadAheadException readAheadExceptionInstance = new ReadAheadException();
 
     protected ServerConfiguration cfg;
-    protected Thread cacheThread;
     // Boolean indicating if this thread should continue running. This is used
     // when we want to stop the thread during a PubSubServer shutdown.
     protected volatile boolean keepRunning = true;
 
     protected final OrderedSafeExecutor cacheWorkers;
-    protected final long maxCacheSize;
+    protected final int numCacheWorkers;
+    protected volatile long maxSegmentSize;
+    protected volatile long cacheEntryTTL;
 
     // JMX Beans
     ReadAheadCacheBean jmxCacheBean = null;
@@ -135,13 +160,23 @@ public class ReadAheadCache implements P
     public ReadAheadCache(PersistenceManagerWithRangeScan realPersistenceManager, ServerConfiguration
cfg) {
         this.realPersistenceManager = realPersistenceManager;
         this.cfg = cfg;
-        cacheThread = new Thread(this, "CacheThread");
-        cacheWorkers = new OrderedSafeExecutor(cfg.getNumReadAheadCacheThreads());
-        maxCacheSize = cfg.getMaximumCacheSize();
+        numCacheWorkers = cfg.getNumReadAheadCacheThreads();
+        cacheWorkers = new OrderedSafeExecutor(numCacheWorkers);
+        reloadConf(cfg);
+    }
+
+    /**
+     * Reload configuration
+     *
+     * @param conf
+     *          Server configuration object
+     */
+    protected void reloadConf(ServerConfiguration cfg) {
+        maxSegmentSize = cfg.getMaximumCacheSize() / numCacheWorkers;
+        cacheEntryTTL = cfg.getCacheEntryTTL();
     }
 
     public ReadAheadCache start() {
-        cacheThread.start();
         return this;
     }
 
@@ -229,9 +264,11 @@ public class ReadAheadCache implements P
             return;
         }
         try {
+            numPendingRequests.incrementAndGet();
             cacheWorkers.submitOrdered(topic, new SafeRunnable() {
                 @Override
                 public void safeRun() {
+                    numPendingRequests.decrementAndGet();
                     obj.performRequest();
                 }
             });
@@ -241,22 +278,6 @@ public class ReadAheadCache implements P
     }
 
     /**
-     * Too complicated to deal with enqueue failures from the context of our
-     * callbacks. Its just simpler to quit and restart afresh. Moreover, this
-     * should not happen as the request queue for the cache maintainer is
-     * unbounded.
-     *
-     * @param obj
-     */
-    protected void enqueueWithoutFailure(CacheRequest obj) {
-        if (!requestQueue.offer(obj)) {
-            throw new UnexpectedError("Could not enqueue object: " + obj.toString()
-                                      + " to cache request queue. Exiting.");
-
-        }
-    }
-
-    /**
      * Another method from {@link PersistenceManager}.
      *
      * 2. Scan - Since the scan needs to touch the cache, we will just enqueue
@@ -275,7 +296,7 @@ public class ReadAheadCache implements P
      * message-ids older than the one specified
      */
     public void deliveredUntil(ByteString topic, Long seqId) {
-        enqueueWithoutFailure(new DeliveredUntil(topic, seqId));
+        enqueueWithoutFailureByTopic(topic, new DeliveredUntil(topic, seqId));
     }
 
     /**
@@ -304,31 +325,15 @@ public class ReadAheadCache implements P
     }
 
     /**
-     * ========================================================================
-     * BEGINNING OF CODE FOR THE CACHE MAINTAINER THREAD
-     *
-     * 1. The run method. It simply dequeues from the request queue, checks the
-     * type of object and acts accordingly
-     */
-    public void run() {
-        while (keepRunning) {
-            CacheRequest obj;
-            try {
-                obj = requestQueue.take();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                return;
-            }
-            obj.performRequest();
-        }
-
-    }
-
-    /**
-     * Stop method which will enqueue a ShutdownCacheRequest.
+     * Stop the readahead cache.
      */
     public void stop() {
-        enqueueWithoutFailure(new ShutdownCacheRequest());
+        try {
+            keepRunning = false;
+            cacheWorkers.shutdown();
+        } catch (Exception e) {
+            logger.warn("Failed to shut down cache workers : ", e);
+        }
     }
 
     /**
@@ -533,31 +538,28 @@ public class ReadAheadCache implements P
             }
         }
 
+        CacheSegment segment = cacheSegment.get();
+        int size = message.getBody().size();
+
         // update the cache size
-        final long newCacheSize = presentCacheSize.addAndGet(message.getBody().size());
+        segment.presentSegmentSize.addAndGet(size);
+        presentCacheSize.addAndGet(size);
 
         synchronized (cacheValue) {
             // finally add the message to the cache
             cacheValue.setMessageAndInvokeCallbacks(message, currTime);
         }
 
-        // if overgrown, collect old entries
-        enqueueWithoutFailure(new CacheRequest() {
-            @Override
-            public void performRequest() {
-                // maintain the index of seq-id
-                MapMethods.addToMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(),
-                                         cacheKey.getSeqId(), TreeSetLongFactory.instance);
-
-                // maintain the time index of addition
-                MapMethods.addToMultiMap(timeIndexOfAddition, currTime,
-                                         cacheKey, HashSetCacheKeyFactory.instance);
-                // update time index
-                if (newCacheSize > maxCacheSize) {
-                    collectOldCacheEntries();
-                }
-            }
-        });
+        // maintain the index of seq-id
+        // no lock since threads are partitioned by topics
+        MapMethods.addToMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(),
+                                 cacheKey.getSeqId(), TreeSetLongFactory.instance);
+
+        // maintain the time index of addition
+        MapMethods.addToMultiMap(segment.timeIndexOfAddition, currTime,
+                                 cacheKey, HashSetCacheKeyFactory.instance);
+
+        collectOldOrExpiredCacheEntries(segment);
     }
 
     protected void removeMessageFromCache(final CacheKey cacheKey, Exception exception,
@@ -569,6 +571,8 @@ public class ReadAheadCache implements P
             return;
         }
 
+        CacheSegment segment = cacheSegment.get();
+
         long timeOfAddition = 0;
         synchronized (cacheValue) {
             if (cacheValue.isStub()) {
@@ -578,26 +582,19 @@ public class ReadAheadCache implements P
                 return;
             }
 
-            presentCacheSize.addAndGet(0 - cacheValue.getMessage().getBody().size());
+            int size = 0 - cacheValue.getMessage().getBody().size();
+            presentCacheSize.addAndGet(size);
+            segment.presentSegmentSize.addAndGet(size);
             timeOfAddition = cacheValue.getTimeOfAddition();
         }
 
-        // maintain the 2 indexes lazily
-        if (maintainSeqIdIndex || maintainTimeIndex) {
-            final long additionTime = timeOfAddition;
-            enqueueWithoutFailure(new CacheRequest() {
-                @Override
-                public void performRequest() {
-                    if (maintainSeqIdIndex) {
-                        MapMethods.removeFromMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(),
-                                                      cacheKey.getSeqId());
-                    }
-                    if (maintainTimeIndex) {
-                        MapMethods.removeFromMultiMap(timeIndexOfAddition, additionTime,
-                                                      cacheKey);
-                    }
-                }
-            });
+        if (maintainSeqIdIndex) {
+            MapMethods.removeFromMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(),
+                                          cacheKey.getSeqId());
+        }
+        if (maintainTimeIndex) {
+            MapMethods.removeFromMultiMap(segment.timeIndexOfAddition,
+                                          timeOfAddition, cacheKey);
         }
     }
 
@@ -605,30 +602,45 @@ public class ReadAheadCache implements P
      * Collection of old entries is simple. Just collect in insert-time order,
      * oldest to newest.
      */
-    protected void collectOldCacheEntries() {
-        while (presentCacheSize.get() > cfg.getMaximumCacheSize () &&
-               !timeIndexOfAddition.isEmpty()) {
-            Long earliestTime = timeIndexOfAddition.firstKey();
-            Set<CacheKey> oldCacheEntries = timeIndexOfAddition.get(earliestTime);
-
-            // Note: only concrete cache entries, and not stubs are in the time
-            // index. Hence there can be no callbacks pending on these cache
-            // entries. Hence safe to remove them directly.
-            for (Iterator<CacheKey> iter = oldCacheEntries.iterator(); iter.hasNext();)
{
-                final CacheKey cacheKey = iter.next();
-
-                logger.debug("Removing {} from cache because it's the oldest.", cacheKey);
-                removeMessageFromCache(cacheKey, readAheadExceptionInstance, //
-                                       // maintainTimeIndex=
-                                       false,
-                                       // maintainSeqIdIndex=
-                                       true);
+    protected void collectOldOrExpiredCacheEntries(CacheSegment segment) {
+        if (cacheEntryTTL > 0) {
+            // clear expired entries
+            while (!segment.timeIndexOfAddition.isEmpty()) {
+                Long earliestTime = segment.timeIndexOfAddition.firstKey();
+                if (MathUtils.now() - earliestTime < cacheEntryTTL) {
+                    break;
+                }
+                collectCacheEntriesAtTimestamp(segment, earliestTime);
             }
+        }
 
-            timeIndexOfAddition.remove(earliestTime);
+        while (segment.presentSegmentSize.get() > maxSegmentSize &&
+               !segment.timeIndexOfAddition.isEmpty()) {
+            Long earliestTime = segment.timeIndexOfAddition.firstKey();
+            collectCacheEntriesAtTimestamp(segment, earliestTime);
         }
     }
 
+    private void collectCacheEntriesAtTimestamp(CacheSegment segment, long timestamp) {
+        Set<CacheKey> oldCacheEntries = segment.timeIndexOfAddition.get(timestamp);
+
+        // Note: only concrete cache entries, and not stubs are in the time
+        // index. Hence there can be no callbacks pending on these cache
+        // entries. Hence safe to remove them directly.
+        for (Iterator<CacheKey> iter = oldCacheEntries.iterator(); iter.hasNext();)
{
+            final CacheKey cacheKey = iter.next();
+
+            logger.debug("Removing {} from cache because it's the oldest.", cacheKey);
+            removeMessageFromCache(cacheKey, readAheadExceptionInstance, //
+                                   // maintainTimeIndex=
+                                   false,
+                                   // maintainSeqIdIndex=
+                                   true);
+        }
+
+        segment.timeIndexOfAddition.remove(timestamp);
+    }
+
     /**
      * ========================================================================
      * The rest is just simple wrapper classes.
@@ -775,16 +787,6 @@ public class ReadAheadCache implements P
         }
     }
 
-    protected class ShutdownCacheRequest implements CacheRequest {
-        // This is a simple type of CacheRequest we will enqueue when
-        // the PubSubServer is shut down and we want to stop the ReadAheadCache
-        // thread.
-        public void performRequest() {
-            keepRunning = false;
-            cacheWorkers.shutdown();
-        }
-    }
-
     @Override
     public void registerJMX(HedwigMBeanInfo parent) {
         try {

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java?rev=1430795&r1=1430794&r2=1430795&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java
Wed Jan  9 11:29:39 2013
@@ -58,7 +58,7 @@ public class ReadAheadCacheBean implemen
 
     @Override
     public int getNumPendingCacheRequests() {
-        return cache.requestQueue.size();
+        return cache.numPendingRequests.get();
     }
 
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java?rev=1430795&r1=1430794&r2=1430795&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java
(original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java
Wed Jan  9 11:29:39 2013
@@ -50,16 +50,11 @@ public class TestReadAheadCacheWhiteBox 
         }
 
         @Override
-        protected void enqueueWithoutFailure(CacheRequest obj) {
-            // make it perform in the same thread
-            obj.performRequest();
-        }
-
-        @Override
         protected void enqueueWithoutFailureByTopic(ByteString topic, final CacheRequest
obj) {
             // make it perform in the same thread
             obj.performRequest();
         }
+
     }
 
     class MyServerConfiguration extends ServerConfiguration {
@@ -68,7 +63,8 @@ public class TestReadAheadCacheWhiteBox 
         // the count limit
         int readAheadCount = NUM_MESSAGES / 2;
         long readAheadSize = (long) (MSG_SIZE * 2.5);
-        long maxCacheSize = Long.MAX_VALUE;
+        long maxCacheSize = Integer.MAX_VALUE;
+        long cacheEntryTTL = 0L;
 
         @Override
         public int getReadAheadCount() {
@@ -84,6 +80,12 @@ public class TestReadAheadCacheWhiteBox 
         public long getMaximumCacheSize() {
             return maxCacheSize;
         }
+
+        @Override
+        public long getCacheEntryTTL() {
+            return cacheEntryTTL;
+        }
+
     }
 
     @Before
@@ -167,7 +169,8 @@ public class TestReadAheadCacheWhiteBox 
         cacheBasedPersistenceManager.deliveredUntil(topic, (long) messages.size());
         // should have no effect
         assertTrue(cacheBasedPersistenceManager.cache.isEmpty());
-        assertTrue(cacheBasedPersistenceManager.timeIndexOfAddition.isEmpty());
+        assertTrue(cacheBasedPersistenceManager.cacheSegment.get()
+                   .timeIndexOfAddition.isEmpty());
         assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.isEmpty());
         assertTrue(0 == cacheBasedPersistenceManager.presentCacheSize.get());
 
@@ -245,7 +248,8 @@ public class TestReadAheadCacheWhiteBox 
         assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.get(topic).contains(1L));
 
         CacheValue value = cacheBasedPersistenceManager.cache.get(key);
-        assertTrue(cacheBasedPersistenceManager.timeIndexOfAddition.get(value.timeOfAddition).contains(key));
+        assertTrue(cacheBasedPersistenceManager.cacheSegment.get()
+                   .timeIndexOfAddition.get(value.timeOfAddition).contains(key));
     }
 
     @Test(timeout=60000)
@@ -255,7 +259,8 @@ public class TestReadAheadCacheWhiteBox 
         cacheBasedPersistenceManager.removeMessageFromCache(key, new Exception(), true, true);
         assertTrue(cacheBasedPersistenceManager.cache.isEmpty());
         assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.isEmpty());
-        assertTrue(cacheBasedPersistenceManager.timeIndexOfAddition.isEmpty());
+        assertTrue(cacheBasedPersistenceManager.cacheSegment.get()
+                   .timeIndexOfAddition.isEmpty());
     }
 
     @Test(timeout=60000)
@@ -268,9 +273,36 @@ public class TestReadAheadCacheWhiteBox 
         }
 
         int n = 2;
-        myConf.maxCacheSize = n * MSG_SIZE;
-        cacheBasedPersistenceManager.collectOldCacheEntries();
+        myConf.maxCacheSize = n * MSG_SIZE * myConf.getNumReadAheadCacheThreads();
+        cacheBasedPersistenceManager.reloadConf(myConf);
+        cacheBasedPersistenceManager.collectOldOrExpiredCacheEntries(
+                cacheBasedPersistenceManager.cacheSegment.get());
+        assertEquals(n, cacheBasedPersistenceManager.cache.size());
+        assertEquals(n, cacheBasedPersistenceManager.cacheSegment.get()
+                     .timeIndexOfAddition.size());
+    }
+
+    @Test(timeout=60000)
+    public void testCollectExpiredCacheEntries() throws Exception {
+        int i = 1;
+        int n = 2;
+        long ttl = 5000L;
+        myConf.cacheEntryTTL = ttl;
+        long curTime = MathUtils.now();
+        cacheBasedPersistenceManager.reloadConf(myConf);
+        for (Message m : messages) {
+            CacheKey key = new CacheKey(topic, i);
+            cacheBasedPersistenceManager.addMessageToCache(key, m, curTime++);
+            if (i == NUM_MESSAGES - n) {
+                Thread.sleep(2 * ttl);
+                curTime += 2 * ttl;
+            }
+            i++;
+        }
+
         assertEquals(n, cacheBasedPersistenceManager.cache.size());
-        assertEquals(n, cacheBasedPersistenceManager.timeIndexOfAddition.size());
+        assertEquals(n, cacheBasedPersistenceManager.cacheSegment.get()
+                     .timeIndexOfAddition.size());
     }
+
 }



Mime
View raw message