Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 9185 invoked from network); 17 Aug 2006 13:07:47 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 17 Aug 2006 13:07:47 -0000 Received: (qmail 96814 invoked by uid 500); 17 Aug 2006 13:07:47 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 96769 invoked by uid 500); 17 Aug 2006 13:07:47 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 96760 invoked by uid 99); 17 Aug 2006 13:07:47 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Aug 2006 06:07:47 -0700 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Aug 2006 06:07:47 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 6B7341A9823; Thu, 17 Aug 2006 06:07:26 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r432224 - in /incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra: ServerSessionImpl.java ServerSessionPoolImpl.java Date: Thu, 17 Aug 2006 13:07:25 -0000 To: activemq-commits@geronimo.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20060817130726.6B7341A9823@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: rajdavies Date: Thu Aug 17 06:07:24 2006 New Revision: 432224 URL: http://svn.apache.org/viewvc?rev=432224&view=rev Log: removed areas of contention (deadlocks) Modified: incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java Modified: incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java?rev=432224&r1=432223&r2=432224&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java (original) +++ incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java Thu Aug 17 06:07:24 2006 @@ -155,7 +155,7 @@ /** * @see java.lang.Runnable#run() */ - synchronized public void run() { + public void run() { log.debug("Running"); while (true) { log.debug("run loop start"); Modified: incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java?rev=432224&r1=432223&r2=432224&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java (original) +++ incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java Thu Aug 17 06:07:24 2006 @@ -35,6 +35,8 @@ import org.apache.activemq.command.MessageDispatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; /** * @version $Revision$ $Date$ @@ -46,9 +48,9 @@ private final ActiveMQEndpointWorker activeMQAsfEndpointWorker; private final int maxSessions; - private ArrayList idleSessions = new ArrayList(); - private LinkedList activeSessions = new LinkedList(); - private boolean closing = false; + private List idleSessions = new CopyOnWriteArrayList(); + private List activeSessions = new CopyOnWriteArrayList(); + private AtomicBoolean closing = new AtomicBoolean(false); public ServerSessionPoolImpl(ActiveMQEndpointWorker activeMQAsfEndpointWorker, int maxSessions) { this.activeMQAsfEndpointWorker = activeMQAsfEndpointWorker; @@ -92,15 +94,15 @@ /** */ - synchronized public ServerSession getServerSession() throws JMSException { + public ServerSession getServerSession() throws JMSException { log.debug("ServerSession requested."); - if (closing) { + if (closing.get()) { throw new JMSException("Session Pool Shutting Down."); } if (idleSessions.size() > 0) { ServerSessionImpl ss = (ServerSessionImpl) idleSessions.remove(idleSessions.size() - 1); - activeSessions.addLast(ss); + activeSessions.add(ss); log.debug("Using idle session: " + ss); return ss; } else { @@ -121,7 +123,7 @@ return getExistingServerSession(); } - activeSessions.addLast(ss); + activeSessions.add(ss); log.debug("Created a new session: " + ss); return ss; } @@ -154,20 +156,22 @@ * @return */ private ServerSession getExistingServerSession() { - ServerSessionImpl ss = (ServerSessionImpl) activeSessions.removeFirst(); - activeSessions.addLast(ss); + ServerSessionImpl ss = (ServerSessionImpl) activeSessions.remove(0); + activeSessions.add(ss); log.debug("Reusing an active session: " + ss); return ss; } - synchronized public void returnToPool(ServerSessionImpl ss) { + public void returnToPool(ServerSessionImpl ss) { log.debug("Session returned to pool: " + ss); activeSessions.remove(ss); idleSessions.add(ss); - notify(); + synchronized(closing){ + closing.notify(); + } } - synchronized public void removeFromPool(ServerSessionImpl ss) { + public void removeFromPool(ServerSessionImpl ss) { activeSessions.remove(ss); try { ActiveMQSession session = (ActiveMQSession) ss.getSession(); @@ -179,16 +183,19 @@ log.error("Error redispatching unconsumed messages from stale session", t); } ss.close(); - notify(); + synchronized(closing){ + closing.notify(); + } } public void close() { - synchronized (this) { - closing = true; + synchronized (closing) { + closing.set(true); closeIdleSessions(); while( activeSessions.size() > 0 ) { + System.out.println("ACtive Sessions = " + activeSessions.size()); try { - wait(); + closing.wait(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; @@ -209,14 +216,14 @@ * @return Returns the closing. */ public boolean isClosing(){ - return closing; + return closing.get(); } /** * @param closing The closing to set. */ public void setClosing(boolean closing){ - this.closing=closing; + this.closing.set(closing); } }