jakarta-jcs-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Smuts, Aaron" <aaro...@amazon.com>
Subject RE: Thread deadlock in CacheEventQueue class
Date Tue, 04 Jan 2005 20:31:03 GMT
I managed to get it to happen in a unit test and verified that the change fixes the problem.
 I'll put it in tonight.

The testPutDelay method below can get it to happen.  It is very tricky to get the timing just
right. 



package org.apache.jcs.engine;

/*
 * Copyright 2001-2004 The Apache Software Foundation.
 *
 * Licensed under the Apache License, Version 2.0 (the "License")
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.IOException;
import java.io.Serializable;

import org.apache.jcs.JCS;
import org.apache.jcs.TestDiskCacheConcurrent;
import org.apache.jcs.engine.behavior.ICacheElement;
import org.apache.jcs.engine.behavior.ICacheListener;

import junit.extensions.ActiveTestSuite;
import junit.framework.Test;
import junit.framework.TestCase;

/**
 * This test case is designed to makes sure there are no deadlocks in the event
 * queue. The time to live should be set to a very short interval to make a
 * deadlock more likely.
 * 
 * @author Aaron Smuts
 */
public class TestEventQueueConcurrent extends TestCase
{

    private static CacheEventQueue queue = null;

    private static CacheListenerImpl listen = null;

    private int maxFailure = 3;

    private int waitBeforeRetry = 100;

    // very small idle time
    private int idleTime = 2;

    /**
     * Constructor for the TestDiskCache object.
     */
    public TestEventQueueConcurrent(String testName)
    {
        super(testName);
    }

    /**
     * Main method passes this test to the text test runner.
     */
    public static void main(String args[])
    {
        String[] testCaseName =
        { TestEventQueueConcurrent.class.getName() };
        junit.textui.TestRunner.main(testCaseName);
    }

    /**
     * A unit test suite for JUnit
     * 
     * @return The test suite
     */
    public static Test suite()
    {

        ActiveTestSuite suite = new ActiveTestSuite();

        suite.addTest(new TestEventQueueConcurrent("testRunPutTest1")
        {
            public void runTest() throws Exception
            {
                this.runPutTest(200, 200);
            }
        });

        suite.addTest(new TestEventQueueConcurrent("testRunPutTest2")
        {
            public void runTest() throws Exception
            {
                this.runPutTest(1200, 1400);
            }
        });

        suite.addTest(new TestEventQueueConcurrent("testRunRemoveTest1")
        {
            public void runTest() throws Exception
            {
                this.runRemoveTest(2200);
            }
        });

        suite.addTest(new TestEventQueueConcurrent("testStopProcessing1")
        {
            public void runTest() throws Exception
            {
                this.runStopProcessingTest();
            }
        });

        suite.addTest(new TestEventQueueConcurrent("testRunPutTest4")
        {
            public void runTest() throws Exception
            {
                this.runPutTest(5200, 6600);
            }
        });

        suite.addTest(new TestEventQueueConcurrent("testRunRemoveTest2")
        {
            public void runTest() throws Exception
            {
                this.runRemoveTest(5200);
            }
        });

        suite.addTest(new TestEventQueueConcurrent("testStopProcessing2")
        {
            public void runTest() throws Exception
            {
                this.runStopProcessingTest();
            }
        });

        
        suite.addTest(new TestEventQueueConcurrent("testRunPutDelayTest")
            {
                public void runTest() throws Exception
                {
                    this.runPutDelayTest(100, 6700);
                }
            });
        
        return suite;
    }

    /**
     * Test setup. Create the static queue to be used by all tests
     */
    public void setUp()
    {
        listen = new CacheListenerImpl();
        queue = new CacheEventQueue(listen, 1L, "testCache1", maxFailure, waitBeforeRetry);

        queue.setWaitToDieMillis(idleTime);
    }

    /**
     * Adds put events to the queue.
     * 
     * @param end
     * @param expectedPutCount
     * @throws Exception
     */
    public void runPutTest(int end, int expectedPutCount) throws Exception
    {
        for (int i = 0; i <= end; i++)
        {
            CacheElement elem = new CacheElement("testCache1", i + ":key", i + "data");
            queue.addPutEvent(elem);
        }

        while (!queue.isEmpty())
        {
            synchronized (this)
            {
                System.out.println("queue is still busy, waiting 250 millis");
                this.wait(250);
            }
        }
        System.out.println("queue is empty, comparing putCount");

        // this becomes less accurate with each test. It should never fail. If
        // it does things are very off.
        assertTrue("The put count [" + listen.putCount
            + "] is below the expected minimum threshold", listen.putCount >= expectedPutCount);

    }

    /**
     * Add remove events to the event queue.
     * 
     * @param end
     * @throws Exception
     */
    public void runRemoveTest(int end) throws Exception
    {
        for (int i = 0; i <= end; i++)
        {
            queue.addRemoveEvent(i + ":key");
        }

    }

    /**
     * Add remove events to the event queue.
     * 
     * @throws Exception
     */
    public void runStopProcessingTest() throws Exception
    {
        queue.stopProcessing();
    }

    /**
     * Test putting and a delay. Waits until queue is empty to start.
     * 
     * @param end
     * @param expectedPutCount
     * @throws Exception
     */
    public void runPutDelayTest(int end, int expectedPutCount) throws Exception
    {
        while (!queue.isEmpty())
        {
            synchronized (this)
            {
                System.out.println("queue is busy, waiting 250 millis to begin");
                this.wait(250);
            }
        }
        System.out.println("queue is empty, begin");

        // get it going
        CacheElement elem = new CacheElement("testCache1", "key","data");
        queue.addPutEvent(elem);

        for (int i = 0; i <= end; i++)
        {
            synchronized (this)
            {
                if ( i % 2 == 0)
                {
                    this.wait(idleTime);                    
                }
                else 
                {
                    this.wait( idleTime / 2 );
                }
            }
            CacheElement elem2 = new CacheElement("testCache1", i+ ":key", i + "data");
            queue.addPutEvent(elem2);
        }

        while (!queue.isEmpty())
        {
            synchronized (this)
            {
                System.out.println("queue is still busy, waiting 250 millis");
                this.wait(250);
            }
        }
        System.out.println("queue is empty, comparing putCount");

        // this becomes less accurate with each test. It should never fail. If
        // it does things are very off.
        assertTrue("The put count [" + listen.putCount
            + "] is below the expected minimum threshold", listen.putCount >= expectedPutCount);

    }

    /**
     * This is a dummy cache listener to use when testing the event queue.
     */
    private class CacheListenerImpl implements ICacheListener
    {

        protected int putCount = 0;

        protected int removeCount = 0;

        /*
         * (non-Javadoc)
         * 
         * @see org.apache.jcs.engine.behavior.ICacheListener#handlePut(org.apache.jcs.engine.behavior.ICacheElement)
         */
        public void handlePut(ICacheElement item) throws IOException
        {
            synchronized (this)
            {
                putCount++;
            }
        }

        /*
         * (non-Javadoc)
         * 
         * @see org.apache.jcs.engine.behavior.ICacheListener#handleRemove(java.lang.String,
         *      java.io.Serializable)
         */
        public void handleRemove(String cacheName, Serializable key) throws IOException
        {
            synchronized (this)
            {
                removeCount++;
            }

        }

        /*
         * (non-Javadoc)
         * 
         * @see org.apache.jcs.engine.behavior.ICacheListener#handleRemoveAll(java.lang.String)
         */
        public void handleRemoveAll(String cacheName) throws IOException
        {
            // TODO Auto-generated method stub

        }

        /*
         * (non-Javadoc)
         * 
         * @see org.apache.jcs.engine.behavior.ICacheListener#handleDispose(java.lang.String)
         */
        public void handleDispose(String cacheName) throws IOException
        {
            // TODO Auto-generated method stub

        }

        /*
         * (non-Javadoc)
         * 
         * @see org.apache.jcs.engine.behavior.ICacheListener#setListenerId(long)
         */
        public void setListenerId(long id) throws IOException
        {
            // TODO Auto-generated method stub

        }

        /*
         * (non-Javadoc)
         * 
         * @see org.apache.jcs.engine.behavior.ICacheListener#getListenerId()
         */
        public long getListenerId() throws IOException
        {
            // TODO Auto-generated method stub
            return 0;
        }

    }
} 

-----Original Message-----
From: Wyatt, Allen [mailto:Allen.Wyatt@travelocity.com] 
Sent: Monday, January 03, 2005 1:47 PM
To: turbine-jcs-user@jakarta.apache.org
Subject: Thread deadlock in CacheEventQueue class

I've encountered a deadlock in the org.apache.jcs.engine.CacheEventQueue
class.  One thread synchronizes on the queueLock object and then tries synchronizing on the
CacheEventQueue object itself while another thread synchronizes on the CacheEventQueue object
and then tries synchronizing on the queueLock object.  Here are the stacks of the threads:

Thread #1:
----------
at org.apache.jcs.engine.CacheEventQueue.put(CacheEventQueue.java:299) - waiting to lock <0x546eaac0>
(a java.lang.Object (the queueLock object)) at
org.apache.jcs.engine.CacheEventQueue.addPutEvent(CacheEventQueue.java:2
11) - locked <0x546ea628> (a org.apache.jcs.engine.CacheEventQueue)
at
org.apache.jcs.auxiliary.disk.AbstractDiskCache.update(AbstractDiskCache
.java:148)
at
org.apache.jcs.engine.control.CompositeCache.spoolToDisk(CompositeCache.
java:346)
at
org.apache.jcs.engine.memory.AbstractMemoryCache.waterfal(AbstractMemory
Cache.java:230)
at
org.apache.jcs.engine.memory.shrinking.ShrinkerThread.shrink(ShrinkerThr
ead.java:247)
at
org.apache.jcs.engine.memory.shrinking.ShrinkerThread.run(ShrinkerThread
.java:119)

Thread #2 (running the Qprocessor inner class):
----------
at
org.apache.jcs.engine.CacheEventQueue.stopProcessing(CacheEventQueue.jav
a:126) - waiting to lock <0x546ea628> (a
org.apache.jcs.engine.CacheEventQueue)
at
org.apache.jcs.engine.CacheEventQueue$QProcessor.run(CacheEventQueue.jav
a:454) - locked <0x546eaac0> (a java.lang.Object (the queueLock object))

Is this a known problem?  Is there a fix?  I tried looking at cvs.apache.org/viewcvs/jakarta-turbine-jcs/.../CacheEventQueue.java
and the code doesn't have a fix as far as I can tell.

I was thinking this could be fixed by changing the code in the QProcessor inner class's run()
method from:

    public void run()
    {
        AbstractCacheEvent r = null;

        while ( queue.isAlive() )
        {
            r = queue.take();
    
            if ( log.isDebugEnabled() )
            {
                log.debug( "Event from queue = " + r );
            }

            if ( r == null )
            {
                synchronized ( queueLock )
                {
                    try
                    {
                        queueLock.wait( queue.getWaitToDieMillis() );
                    }
                    catch ( InterruptedException e )
                    {
                        log.warn(
                            "Interrupted while waiting for another event to come in before
we die." );
                        return;
                    }
                    r = queue.take();
                    if ( log.isDebugEnabled() )
                    {
                        log.debug( "Event from queue after sleep = " + r );
                    }
                    if ( r == null )
                    {
                        queue.stopProcessing();
                    }
                }
            }

            if ( queue.isWorking() && queue.isAlive() && r != null )
            {
                r.run();
            }
        }
        if ( log.isInfoEnabled() )
        {
            log.info( "QProcessor exiting for " + queue );
        }
    }

to the following code:

    public void run()
    {
        AbstractCacheEvent r = null;

        while ( queue.isAlive() )
        {
            r = queue.take();
    
            if ( log.isDebugEnabled() )
            {
                log.debug( "Event from queue = " + r );
            }

            if ( r == null )
            {
                synchronized ( queueLock )
                {
                    try
                    {
                        queueLock.wait( queue.getWaitToDieMillis() );
                    }
                    catch ( InterruptedException e )
                    {
                        log.warn(
                            "Interrupted while waiting for another event to come in before
we die." );
                        return;
                    }
                    r = queue.take();
                    if ( log.isDebugEnabled() )
                    {
                        log.debug( "Event from queue after sleep = " + r );
                    }
                    /*** MOVED CODE FROM HERE (inside synchronized
block) TO BELOW (outside synchronized block) ***/
                }
                /*** MOVED CODE STARTS BELOW: ****/
                if ( r == null )
                {
                    queue.stopProcessing();
                }
                /*** END OF MOVED CODE ****/
            }

            if ( queue.isWorking() && queue.isAlive() && r != null )
            {
                r.run();
            }
        }
        if ( log.isInfoEnabled() )
        {
            log.info( "QProcessor exiting for " + queue );
        }
    }

Does this sound reasonable?

---------------------------------------------------------------------
To unsubscribe, e-mail: turbine-jcs-user-unsubscribe@jakarta.apache.org
For additional commands, e-mail: turbine-jcs-user-help@jakarta.apache.org


Mime
View raw message