qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1544129 - in /qpid/trunk/qpid/java: amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
Date Thu, 21 Nov 2013 11:31:30 GMT
Author: rgodfrey
Date: Thu Nov 21 11:31:30 2013
New Revision: 1544129

URL: http://svn.apache.org/r1544129
Log:
QPID-5344 : Deadlock in JMS AMQP 1.0 client - patch from David Ingham

Modified:
    qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
    qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java?rev=1544129&r1=1544128&r2=1544129&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
(original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
Thu Nov 21 11:31:30 2013
@@ -175,43 +175,48 @@ public class ConnectionImpl implements C
                 connect();
                 started = true;
             }
+        }
 
-            try
+        try
+        {
+            SessionImpl session = new SessionImpl(this, acknowledgeMode);
+            session.setQueueSession(_isQueueConnection);
+            session.setTopicSession(_isTopicConnection);
+            
+            boolean connectionStarted = false;
+            synchronized(_lock)
             {
-                SessionImpl session = new SessionImpl(this, acknowledgeMode);
-                session.setQueueSession(_isQueueConnection);
-                session.setTopicSession(_isTopicConnection);
+                checkClosed();
                 _sessions.add(session);
-
-                if(_state == State.STARTED)
-                {
-                    session.start();
-                }
-
-                return session;
+                connectionStarted = _state == State.STARTED;
             }
-            catch(JMSException e)
+            
+            if(connectionStarted)
             {
-                Error remoteError;
-                if(started
-                   && e.getLinkedException() instanceof ConnectionErrorException
-                   && (remoteError = ((ConnectionErrorException)e.getLinkedException()).getRemoteError()).getCondition()
== ConnectionError.REDIRECT)
-                {
-                    String networkHost = (String) remoteError.getInfo().get(Symbol.valueOf("network-host"));
-                    int port = (Integer) remoteError.getInfo().get(Symbol.valueOf("port"));
-                    String hostName = (String) remoteError.getInfo().get(Symbol.valueOf("hostname"));
-                    reconnect(networkHost,port,hostName);
-                    return createSession(acknowledgeMode);
-
-                }
-                else
-                {
-                    throw e;
-                }
+                session.start();
             }
+            
+            return session;
         }
+        catch(JMSException e)
+        {
+            Error remoteError;
+            if(started
+               && e.getLinkedException() instanceof ConnectionErrorException
+               && (remoteError = ((ConnectionErrorException)e.getLinkedException()).getRemoteError()).getCondition()
== ConnectionError.REDIRECT)
+            {
+                String networkHost = (String) remoteError.getInfo().get(Symbol.valueOf("network-host"));
+                int port = (Integer) remoteError.getInfo().get(Symbol.valueOf("port"));
+                String hostName = (String) remoteError.getInfo().get(Symbol.valueOf("hostname"));
+                reconnect(networkHost,port,hostName);
+                return createSession(acknowledgeMode);
 
-
+            }
+            else
+            {
+                throw e;
+            }
+        }
     }
 
     void removeSession(SessionImpl session)
@@ -272,6 +277,7 @@ public class ConnectionImpl implements C
 
     public void start() throws JMSException
     {
+        List<SessionImpl> stoppedSessions = null;
         synchronized(_lock)
         {
             checkClosed();
@@ -281,30 +287,30 @@ public class ConnectionImpl implements C
                 // TODO
 
                 _state = State.STARTED;
-
-                for(SessionImpl session : _sessions)
-                {
-                    session.start();
-                }
-
+                stoppedSessions = new ArrayList<SessionImpl>(_sessions);
             }
 
             _lock.notifyAll();
         }
 
+        if (stoppedSessions != null)
+        {
+            for(SessionImpl session : stoppedSessions)
+            {
+                session.start();
+            }
+        }
     }
 
     public void stop() throws JMSException
     {
+        List<SessionImpl> startedSessions = null;
         synchronized(_lock)
         {
             switch(_state)
             {
                 case STARTED:
-                    for(SessionImpl session : _sessions)
-                    {
-                        session.stop();
-                    }
+                    startedSessions = new ArrayList<SessionImpl>(_sessions);
                 case UNCONNECTED:
                     _state = State.STOPPED;
                     break;
@@ -314,6 +320,14 @@ public class ConnectionImpl implements C
 
             _lock.notifyAll();
         }
+        
+        if (startedSessions != null)
+        {
+            for(SessionImpl session : startedSessions)
+            {
+                session.stop();
+            }
+        }
     }
 
 
@@ -341,39 +355,34 @@ public class ConnectionImpl implements C
 
     public void close() throws JMSException
     {
-        Object outerLock;
-        if(_conn != null)
-        {
-            outerLock = _conn.getEndpoint().getLock();
-        }
-        else
+        List<SessionImpl> sessions = null;
+        List<CloseTask> closeTasks = null;
+        boolean closeConnection = false;
+        synchronized(_lock)
         {
-            outerLock = _lock;
+            if(_state != State.CLOSED)
+            {
+                _state = State.CLOSED;
+                sessions = new ArrayList<SessionImpl>(_sessions);
+                closeTasks = new ArrayList<CloseTask>(_closeTasks);
+                closeConnection = _conn != null && _state != State.UNCONNECTED;
+            }
+            
+            _lock.notifyAll();
         }
-
-        synchronized (outerLock)
+        
+        if (sessions != null)
         {
-            synchronized(_lock)
+            for(SessionImpl session : sessions)
             {
-                if(_state != State.CLOSED)
-                {
-                    stop();
-                    List<SessionImpl> sessions = new ArrayList<SessionImpl>(_sessions);
-                    for(SessionImpl session : sessions)
-                    {
-                        session.close();
-                    }
-                    for(CloseTask task : _closeTasks)
-                    {
-                        task.onClose();
-                    }
-                    if(_conn != null && _state != State.UNCONNECTED ) {
-                        _conn.close();
-                    }
-                    _state = State.CLOSED;
-                }
-
-                _lock.notifyAll();
+                session.close();
+            }
+            for(CloseTask task : closeTasks)
+            {
+                task.onClose();
+            }
+            if(closeConnection) {
+                _conn.close();
             }
         }
     }

Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java?rev=1544129&r1=1544128&r2=1544129&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
(original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
Thu Nov 21 11:31:30 2013
@@ -114,35 +114,38 @@ public class Session
 
     }
 
-    public synchronized SendingLinkEndpoint createSendingLinkEndpoint(final String linkName,
-                                                                      final Target target,
-                                                                      final Source source,
-                                                                      AcknowledgeMode mode,
-                                                                      Map<Binary, Outcome>
unsettled,
-                                                                      final DeliveryStateHandler
deliveryStateHandler)
-    {
-        SendingLinkEndpoint link = this.getEndpoint().createSendingLinkEndpoint(linkName,
source, target,
-                                                                                unsettled,
deliveryStateHandler);
-
-        switch(mode)
-        {
-            case ALO:
-            	link.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
-            	link.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
-                break;
-            case AMO:
-            	link.setSendingSettlementMode(SenderSettleMode.SETTLED);
-                break;
-            case EO:
-            	link.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
-            	link.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
-                break;
-
-        }
-        
-        link.attach();
-        
-    	return link;
+    public SendingLinkEndpoint createSendingLinkEndpoint(final String linkName,
+                                                         final Target target,
+                                                         final Source source,
+                                                         AcknowledgeMode mode,
+                                                         Map<Binary, Outcome> unsettled,
+                                                         final DeliveryStateHandler deliveryStateHandler)
+    {
+    	SessionEndpoint endpoint = this.getEndpoint();
+    	synchronized(endpoint.getLock())
+    	{
+            SendingLinkEndpoint link = endpoint.createSendingLinkEndpoint(linkName, source,
target,
+                                                                          unsettled, deliveryStateHandler);
+            
+            switch(mode)
+            {
+                case ALO:
+                	link.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+                	link.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+                    break;
+                case AMO:
+                	link.setSendingSettlementMode(SenderSettleMode.SETTLED);
+                    break;
+                case EO:
+                	link.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+                	link.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
+                    break;
+                    
+            }
+            
+            link.attach();
+        	return link;
+    	}
     }
 
     public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message