commons-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t.@apache.org
Subject svn commit: r1778931 - in /commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine: CacheEventQueue.java PooledCacheEventQueue.java
Date Sun, 15 Jan 2017 18:01:19 GMT
Author: tv
Date: Sun Jan 15 18:01:19 2017
New Revision: 1778931

URL: http://svn.apache.org/viewvc?rev=1778931&view=rev
Log:
Derive CacheEventQueue from the pooled implementation, just use pool of size 1

Modified:
    commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java
    commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java

Modified: commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java
URL: http://svn.apache.org/viewvc/commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java?rev=1778931&r1=1778930&r2=1778931&view=diff
==============================================================================
--- commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java
(original)
+++ commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java
Sun Jan 15 18:01:19 2017
@@ -1,9 +1,5 @@
 package org.apache.commons.jcs.engine;
 
-import java.util.ArrayList;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -23,36 +19,22 @@ import java.util.concurrent.TimeUnit;
  * under the License.
  */
 
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.jcs.engine.behavior.ICacheListener;
-import org.apache.commons.jcs.engine.stats.StatElement;
-import org.apache.commons.jcs.engine.stats.Stats;
-import org.apache.commons.jcs.engine.stats.behavior.IStatElement;
-import org.apache.commons.jcs.engine.stats.behavior.IStats;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.commons.jcs.utils.threadpool.DaemonThreadFactory;
 
 /**
  * An event queue is used to propagate ordered cache events to one and only one target listener.
- * <p>
- * This is a modified version of the experimental version. It should lazy initialize the
processor
- * thread, and kill the thread if the queue goes empty for a specified period, now set to
1 minute.
- * If something comes in after that a new processor thread should be created.
  */
 public class CacheEventQueue<K, V>
-    extends AbstractCacheEventQueue<K, V>
+    extends PooledCacheEventQueue<K, V>
 {
-    /** The logger. */
-    private static final Log log = LogFactory.getLog( CacheEventQueue.class );
-
     /** The type of queue -- there are pooled and single */
     private static final QueueType queueType = QueueType.SINGLE;
 
-    /** the thread that works the queue. */
-    private Thread processorThread;
-
-    /** Queue implementation */
-    private LinkedBlockingQueue<AbstractCacheEvent> queue = new LinkedBlockingQueue<AbstractCacheEvent>();
-
     /**
      * Constructs with the specified listener and the cache name.
      * <p>
@@ -77,205 +59,46 @@ public class CacheEventQueue<K, V>
     public CacheEventQueue( ICacheListener<K, V> listener, long listenerId, String
cacheName, int maxFailure,
                             int waitBeforeRetry )
     {
-        initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry );
-    }
-
-    /**
-     * What type of queue is this.
-     * <p>
-     * @return queueType
-     */
-    @Override
-    public QueueType getQueueType()
-    {
-        return queueType;
-    }
-
-    /**
-     * Kill the processor thread and indicate that the queue is destroyed and no longer alive,
but it
-     * can still be working.
-     */
-    protected void stopProcessing()
-    {
-        setAlive(false);
-        processorThread = null;
-    }
-
-    /**
-     * Event Q is empty.
-     * <p>
-     * Calling destroy interrupts the processor thread.
-     */
-    @Override
-    public void destroy()
-    {
-        if ( isAlive() )
-        {
-            setAlive(false);
-
-            if ( log.isInfoEnabled() )
-            {
-                log.info( "Destroying queue, stats =  " + getStatistics() );
-            }
-
-            if ( processorThread != null )
-            {
-                processorThread.interrupt();
-                processorThread = null;
-            }
-
-            if ( log.isInfoEnabled() )
-            {
-                log.info( "Cache event queue destroyed: " + this );
-            }
-        }
-        else
-        {
-            if ( log.isInfoEnabled() )
-            {
-                log.info( "Destroy was called after queue was destroyed. Doing nothing. Stats
=  " + getStatistics() );
-            }
-        }
-    }
-
-    /**
-     * Adds an event to the queue.
-     * <p>
-     * @param event
-     */
-    @Override
-    protected void put( AbstractCacheEvent event )
-    {
-        if ( log.isDebugEnabled() )
-        {
-            log.debug( "Event entering Queue for " + getCacheName() + ": " + event );
-        }
-
-        queue.offer(event);
-
-        if ( isWorking() )
-        {
-            if ( !isAlive() )
-            {
-                setAlive(true);
-                processorThread = new QProcessor();
-                processorThread.start();
-                if ( log.isInfoEnabled() )
-                {
-                    log.info( "Cache event queue created: " + this );
-                }
-            }
-        }
+        super( listener, listenerId, cacheName, maxFailure, waitBeforeRetry, null );
     }
 
-    // /////////////////////////// Inner classes /////////////////////////////
-
     /**
-     * This is the thread that works the queue.
+     * Initializes the queue.
      * <p>
-     * @author asmuts
-     * @created January 15, 2002
-     */
-    protected class QProcessor
-        extends Thread
-    {
-        /**
-         * Constructor for the QProcessor object
-         * <p>
-         * @param aQueue the event queue to take items from.
-         */
-        QProcessor()
-        {
-            super( "CacheEventQueue.QProcessor-" + getCacheName() );
-            setDaemon( true );
-        }
-
-        /**
-         * Main processing method for the QProcessor object.
-         * <p>
-         * Waits for a specified time (waitToDieMillis) for something to come in and if no
new
-         * events come in during that period the run method can exit and the thread is dereferenced.
-         */
-        @Override
-        public void run()
-        {
-
-            while ( CacheEventQueue.this.isAlive() )
-            {
-                AbstractCacheEvent event = null;
-
-                try
-                {
-                    event = queue.poll(getWaitToDieMillis(), TimeUnit.MILLISECONDS);
-                }
-                catch (InterruptedException e)
-                {
-                    // is ok
-                }
-
-                if ( log.isDebugEnabled() )
-                {
-                    log.debug( "Event from queue = " + event );
-                }
-
-                if ( event == null )
-                {
-                    stopProcessing();
-                }
-
-                if ( event != null && isWorking() && CacheEventQueue.this.isAlive()
)
-                {
-                    event.run();
-                }
-            }
-            if ( log.isDebugEnabled() )
-            {
-                log.debug( "QProcessor exiting for " + getCacheName() );
-            }
-        }
-    }
-
-    /**
-     * This method returns semi-structured data on this queue.
-     * <p>
-     * @see org.apache.commons.jcs.engine.behavior.ICacheEventQueue#getStatistics()
-     * @return information on the status and history of the queue
+     * @param listener
+     * @param listenerId
+     * @param cacheName
+     * @param maxFailure
+     * @param waitBeforeRetry
+     * @param threadPoolName
      */
     @Override
-    public IStats getStatistics()
+    protected void initialize( ICacheListener<K, V> listener, long listenerId, String
cacheName, int maxFailure,
+                            int waitBeforeRetry, String threadPoolName )
     {
-        IStats stats = new Stats();
-        stats.setTypeName( "Cache Event Queue" );
-
-        ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>();
-
-        elems.add(new StatElement<Boolean>( "Working", Boolean.valueOf(this.isWorking())
) );
-        elems.add(new StatElement<Boolean>( "Alive", Boolean.valueOf(this.isAlive())
) );
-        elems.add(new StatElement<Boolean>( "Empty", Boolean.valueOf(this.isEmpty())
) );
-        elems.add(new StatElement<Integer>( "Size", Integer.valueOf(this.size()) )
);
-
-        stats.setStatElements( elems );
+        super.initialize(listener, listenerId, cacheName, maxFailure, waitBeforeRetry);
 
-        return stats;
-    }
+        // create a default pool with one worker thread to mimic the SINGLE queue behavior
+        LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
 
-    /**
-     * @return whether there are any items in the queue.
-     */
-    @Override
-    public boolean isEmpty()
-    {
-        return queue.isEmpty();
+        pool = new ThreadPoolExecutor(
+            0,
+            1,
+            getWaitToDieMillis(),
+            TimeUnit.MILLISECONDS,
+            queue,
+            new DaemonThreadFactory("CacheEventQueue.QProcessor-" + getCacheName()));
+        setAlive(true);
     }
 
     /**
-     * Returns the number of elements in the queue.
+     * What type of queue is this.
      * <p>
-     * @return number of items in the queue.
+     * @return queueType
      */
     @Override
-    public int size()
+    public QueueType getQueueType()
     {
-        return queue.size();
+        return queueType;
     }
 }

Modified: commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java
URL: http://svn.apache.org/viewvc/commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java?rev=1778931&r1=1778930&r2=1778931&view=diff
==============================================================================
--- commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java
(original)
+++ commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java
Sun Jan 15 18:01:19 2017
@@ -52,7 +52,7 @@ public class PooledCacheEventQueue<K, V>
     private static final QueueType queueType = QueueType.POOLED;
 
     /** The Thread Pool to execute events with. */
-    private ThreadPoolExecutor pool = null;
+    protected ThreadPoolExecutor pool = null;
 
     /**
      * Constructor for the CacheEventQueue object
@@ -88,6 +88,7 @@ public class PooledCacheEventQueue<K, V>
         // this will share the same pool with other event queues by default.
         pool = ThreadPoolManager.getInstance().getPool(
                 (threadPoolName == null) ? "cache_event_queue" : threadPoolName );
+        setAlive(true);
     }
 
     /**



Mime
View raw message