activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r637028 - /activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
Date Fri, 14 Mar 2008 09:53:40 GMT
Author: rajdavies
Date: Fri Mar 14 02:53:39 2008
New Revision: 637028

URL: http://svn.apache.org/viewvc?rev=637028&view=rev
Log:
tidied up synchronization

Modified:
    activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java

Modified: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java?rev=637028&r1=637027&r2=637028&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
(original)
+++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
Fri Mar 14 02:53:39 2008
@@ -16,9 +16,9 @@
  */
 package org.apache.activemq.ra;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.JMSException;
@@ -45,8 +45,8 @@
     private final ActiveMQEndpointWorker activeMQAsfEndpointWorker;
     private final int maxSessions;
 
-    private List<ServerSessionImpl> idleSessions = new CopyOnWriteArrayList<ServerSessionImpl>();
-    private List<ServerSessionImpl> activeSessions = new CopyOnWriteArrayList<ServerSessionImpl>();
+    private List<ServerSessionImpl> idleSessions = new ArrayList<ServerSessionImpl>();
+    private List<ServerSessionImpl> activeSessions = new ArrayList<ServerSessionImpl>();
     private AtomicBoolean closing = new AtomicBoolean(false);
 
     public ServerSessionPoolImpl(ActiveMQEndpointWorker activeMQAsfEndpointWorker, int maxSessions)
{
@@ -76,7 +76,9 @@
         } catch (UnavailableException e) {
             // The container could be limiting us on the number of endpoints
             // that are being created.
-            LOG.debug("Could not create an endpoint.", e);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Could not create an endpoint.", e);
+            }
             session.close();
             return null;
         }
@@ -92,17 +94,30 @@
     /**
      */
     public ServerSession getServerSession() throws JMSException {
-        LOG.debug("ServerSession requested.");
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("ServerSession requested.");
+        }
         if (closing.get()) {
             throw new JMSException("Session Pool Shutting Down.");
         }
 
-        if (idleSessions.size() > 0) {
-            ServerSessionImpl ss = idleSessions.remove(idleSessions.size() - 1);
-            activeSessions.add(ss);
-            LOG.debug("Using idle session: " + ss);
+        ServerSessionImpl ss = null;
+        synchronized (idleSessions) {
+            if (idleSessions.size() > 0) {
+                ss = idleSessions.remove(idleSessions.size() - 1);
+            }
+        }
+        if (ss != null) {
+            synchronized (activeSessions) {
+                activeSessions.add(ss);
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Using idle session: " + ss);
+            }
             return ss;
-        } else {
+        }
+
+        synchronized (activeSessions) {
             // Are we at the upper limit?
             if (activeSessions.size() >= maxSessions) {
                 // then reuse the already created sessions..
@@ -110,66 +125,97 @@
                 // processing.
                 return getExistingServerSession();
             }
-            ServerSessionImpl ss = createServerSessionImpl();
-            // We may not be able to create a session due to the container
-            // restricting us.
-            if (ss == null) {
-                if (activeSessions.size() == 0) {
-                    //no idle sessions, no active sessions, and we can't create a new session....
-                    throw new JMSException("Endpoint factory did not allow creation of any
endpoints.");
-                }
+        }
 
-                return getExistingServerSession();
+        ss = createServerSessionImpl();
+        // We may not be able to create a session due to the container
+        // restricting us.
+        if (ss == null) {
+            synchronized (activeSessions) {
+                if (activeSessions.isEmpty()) {
+                    throw new JMSException(
+                            "Endpoint factory did not allow creation any endpoints.");
+                }
             }
+
+            return getExistingServerSession();
+        }
+        synchronized (activeSessions) {
             activeSessions.add(ss);
+        }
+        if (LOG.isDebugEnabled()) {
             LOG.debug("Created a new session: " + ss);
-            return ss;
         }
+        return ss;
+
     }
 
     /**
-     * @param messageDispatch the message to dispatch
+     * @param messageDispatch
+     *            the message to dispatch
      * @throws JMSException
      */
-    private void dispatchToSession(MessageDispatch messageDispatch) throws JMSException {
+    private void dispatchToSession(MessageDispatch messageDispatch)
+            throws JMSException {
 
         ServerSession serverSession = getServerSession();
         Session s = serverSession.getSession();
         ActiveMQSession session = null;
         if (s instanceof ActiveMQSession) {
-            session = (ActiveMQSession)s;
+            session = (ActiveMQSession) s;
         } else if (s instanceof ActiveMQQueueSession) {
-            session = (ActiveMQSession)s;
+            session = (ActiveMQSession) s;
         } else if (s instanceof ActiveMQTopicSession) {
-            session = (ActiveMQSession)s;
+            session = (ActiveMQSession) s;
         } else {
-            activeMQAsfEndpointWorker.connection.onAsyncException(new JMSException("Session
pool provided an invalid session type: " + s.getClass()));
+            activeMQAsfEndpointWorker.connection
+                    .onAsyncException(new JMSException(
+                            "Session pool provided an invalid session type: "
+                                    + s.getClass()));
         }
         session.dispatch(messageDispatch);
         serverSession.start();
     }
 
     /**
-     * @return
+     * @return session
      */
     private ServerSession getExistingServerSession() {
-        ServerSessionImpl ss = activeSessions.remove(0);
-        activeSessions.add(ss);
-        LOG.debug("Reusing an active session: " + ss);
+        ServerSessionImpl ss = null;
+        if (!activeSessions.isEmpty()) {
+            if (activeSessions.size() > 1) {
+                // round robin
+                ss = activeSessions.remove(0);
+                activeSessions.add(ss);
+            } else {
+                ss = activeSessions.get(0);
+            }
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Reusing an active session: " + ss);
+        }
         return ss;
     }
 
     public void returnToPool(ServerSessionImpl ss) {
-        LOG.debug("Session returned to pool: " + ss);
-        activeSessions.remove(ss);
-        idleSessions.add(ss);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Session returned to pool: " + ss);
+        }
+        synchronized(activeSessions) {
+            activeSessions.remove(ss);
+        }
+        synchronized(idleSessions) {
+            idleSessions.add(ss);
+        }
         synchronized (closing) {
             closing.notify();
         }
     }
 
     public void removeFromPool(ServerSessionImpl ss) {
-        activeSessions.remove(ss);
+        synchronized(activeSessions) {
+            activeSessions.remove(ss);
+        }
         try {
             ActiveMQSession session = (ActiveMQSession)ss.getSession();
             List l = session.getUnconsumedMessages();
@@ -186,26 +232,35 @@
     }
 
     public void close() {
-        synchronized (closing) {
-            closing.set(true);
-            closeIdleSessions();
-            while (activeSessions.size() > 0) {
-                LOG.debug("Active Sessions = " + activeSessions.size());
-                try {
-                    closing.wait(1000);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    return;
+        closing.set(true);
+        closeIdleSessions();
+        // we may have to wait erroneously 250ms if an
+        // active session is removed during our wait and we
+        // are not notified
+        while (getActiveSessionSize() > 0) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Active Sessions = " + getActiveSessionSize());
+            }
+            try {
+                synchronized (closing) {
+                    closing.wait(250);
                 }
-                closeIdleSessions();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return;
             }
+            closeIdleSessions();
         }
     }
+    
 
     private void closeIdleSessions() {
-        for (Iterator<ServerSessionImpl> iter = idleSessions.iterator(); iter.hasNext();)
{
-            ServerSessionImpl ss = iter.next();
-            ss.close();
+        synchronized(idleSessions) {
+            for (Iterator<ServerSessionImpl> iter = idleSessions.iterator(); iter.hasNext();)
{
+                ServerSessionImpl ss = iter.next();
+                ss.close();
+            }
+            idleSessions.clear();
         }
     }
 
@@ -215,12 +270,18 @@
     public boolean isClosing() {
         return closing.get();
     }
-
+    
     /**
      * @param closing The closing to set.
      */
     public void setClosing(boolean closing) {
         this.closing.set(closing);
+    }
+    
+    private int getActiveSessionSize() {
+        synchronized(activeSessions) {
+            return activeSessions.size();
+        }
     }
 
 }



Mime
View raw message