qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1659605 - in /qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: ./ util/
Date Fri, 13 Feb 2015 17:01:59 GMT
Author: kwall
Date: Fri Feb 13 17:01:59 2015
New Revision: 1659605

URL: http://svn.apache.org/r1659605
Log:
QPID-6374: [Java Broker] 0-10 Failover: the thread performing the failover prep now syncs
the dispatch queue (avoids possibility of app level dead lock)

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1659605&r1=1659604&r2=1659605&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri
Feb 13 17:01:59 2015
@@ -1216,11 +1216,6 @@ public class AMQConnection extends Close
         return _failoverMutex;
     }
 
-    public void failoverPrep()
-    {
-        _delegate.failoverPrep();
-    }
-
     public void resubscribeSessions() throws JMSException, AMQException, FailoverException
     {
         _delegate.resubscribeSessions();
@@ -1653,4 +1648,46 @@ public class AMQConnection extends Close
     {
         return _messageCompressionThresholdSize;
     }
+
+    void doWithAllLocks(Runnable r)
+    {
+        doWithAllLocks(r, _sessions.values());
+
+    }
+
+    private void doWithAllLocks(final Runnable r, final List<AMQSession> sessions)
+    {
+        if (!sessions.isEmpty())
+        {
+            AMQSession session = sessions.remove(0);
+
+            final Object dispatcherLock = session.getDispatcherLock();
+            if (dispatcherLock != null)
+            {
+                synchronized (dispatcherLock)
+                {
+                    synchronized (session.getMessageDeliveryLock())
+                    {
+                        doWithAllLocks(r, sessions);
+                    }
+                }
+            }
+            else
+            {
+                synchronized (session.getMessageDeliveryLock())
+                {
+                    doWithAllLocks(r, sessions);
+                }
+            }
+        }
+        else
+        {
+            synchronized (getFailoverMutex())
+            {
+                r.run();
+            }
+        }
+    }
+
+
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1659605&r1=1659604&r2=1659605&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
Fri Feb 13 17:01:59 2015
@@ -52,8 +52,6 @@ public interface AMQConnectionDelegate
 
     XASession createXASession(int ackMode) throws JMSException;
 
-    void failoverPrep();
-
     void resubscribeSessions() throws JMSException, AMQException, FailoverException;
 
     void closeConnection(long timeout) throws JMSException, AMQException;

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1659605&r1=1659604&r2=1659605&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
Fri Feb 13 17:01:59 2015
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
@@ -249,7 +250,7 @@ public class AMQConnectionDelegate_0_10
         List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
         for (AMQSession s : sessions)
         {
-            s.failoverPrep();
+            ((AMQSession_0_10)s).failoverPrep();
         }
     }
 
@@ -306,16 +307,21 @@ public class AMQConnectionDelegate_0_10
 
             _qpidConnection.notifyFailoverRequired();
 
-            synchronized (_conn.getFailoverMutex())
+            final AtomicBoolean failoverDone = new AtomicBoolean();
+
+            _conn.doWithAllLocks(new Runnable()
             {
+                @Override
+                public void run()
+                {
                 try
                 {
                     if (_conn.firePreFailover(false) && _conn.attemptReconnection())
                     {
-                        _conn.failoverPrep();
+                        failoverPrep();
                         _conn.resubscribeSessions();
                         _conn.fireFailoverComplete();
-                        return;
+                        failoverDone.set(true);
                     }
                 }
                 catch (Exception e)
@@ -327,9 +333,19 @@ public class AMQConnectionDelegate_0_10
                     _conn.getProtocolHandler().getFailoverLatch().countDown();
                     _conn.getProtocolHandler().setFailoverLatch(null);
                 }
+
+                }
+            });
+
+
+            if (failoverDone.get())
+            {
+                return;
             }
+
         }
 
+
         _conn.setClosed();
 
         final ExceptionListener listener = _conn.getExceptionListenerNoCheck();

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1659605&r1=1659604&r2=1659605&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
Fri Feb 13 17:01:59 2015
@@ -350,11 +350,6 @@ public class AMQConnectionDelegate_8_0 i
         }
     }
 
-    public void failoverPrep()
-    {
-        // do nothing
-    }
-
     /**
      * For all sessions, and for all consumers in those sessions, resubscribe. This is called
during failover handling.
      * The caller must hold the failover mutex before calling this method.

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1659605&r1=1659604&r2=1659605&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Feb
13 17:01:59 2015
@@ -169,7 +169,7 @@ public abstract class AMQSession<C exten
     private final Lock _subscriberDetails = new ReentrantLock(true);
     private final Lock _subscriberAccess = new ReentrantLock(true);
 
-    private final FlowControllingBlockingQueue _queue;
+    private final FlowControllingBlockingQueue<Dispatchable> _queue;
 
     private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
     private final AtomicLong _rollbackMark = new AtomicLong(-1);
@@ -358,7 +358,7 @@ public abstract class AMQSession<C exten
         if (_acknowledgeMode == NO_ACKNOWLEDGE)
         {
             _queue =
-                    new FlowControllingBlockingQueue(_prefetchHighMark, _prefetchLowMark,
+                    new FlowControllingBlockingQueue<Dispatchable>(_prefetchHighMark,
_prefetchLowMark,
                                                      new FlowControllingBlockingQueue.ThresholdListener()
                                                      {
                                                          private final AtomicBoolean _suspendState
= new AtomicBoolean();
@@ -423,7 +423,7 @@ public abstract class AMQSession<C exten
         }
         else
         {
-            _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null);
+            _queue = new FlowControllingBlockingQueue<Dispatchable>(_prefetchHighMark,
null);
         }
 
         // Add creation logging to tie in with the existing close logging
@@ -1789,7 +1789,7 @@ public abstract class AMQSession<C exten
             //in the pre-dispatch queue.
             _usingDispatcherForCleanup = true;
 
-            syncDispatchQueue();
+            syncDispatchQueue(false);
 
             // Set to false before sending the recover as 0-8/9/9-1 will
             //send messages back before the recover completes, and we
@@ -1881,7 +1881,7 @@ public abstract class AMQSession<C exten
 
                 setRollbackMark();
 
-                syncDispatchQueue();
+                syncDispatchQueue(false);
 
                 _dispatcher.rollback();
 
@@ -2201,21 +2201,17 @@ public abstract class AMQSession<C exten
 
     }
 
-    void failoverPrep()
-    {
-        syncDispatchQueue();
-    }
 
-    void syncDispatchQueue()
+    void syncDispatchQueue(final boolean holdDispatchLock)
     {
-        if (Thread.currentThread() == _dispatcherThread)
+        if (Thread.currentThread() == _dispatcherThread || holdDispatchLock)
         {
             while (!super.isClosed() && !_queue.isEmpty())
             {
                 Dispatchable disp;
                 try
                 {
-                    disp = (Dispatchable) _queue.take();
+                    disp = _queue.take();
                 }
                 catch (InterruptedException e)
                 {
@@ -2267,7 +2263,7 @@ public abstract class AMQSession<C exten
                 Dispatchable disp;
                 try
                 {
-                    disp = (Dispatchable) _queue.take();
+                    disp = _queue.take();
                 }
                 catch (InterruptedException e)
                 {
@@ -3086,7 +3082,7 @@ public abstract class AMQSession<C exten
 
     private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers)
     {
-        Iterator messages = _queue.iterator();
+        Iterator<Dispatchable> messages = _queue.iterator();
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag
+ ") (PDispatchQ) requeue:"
@@ -3237,6 +3233,12 @@ public abstract class AMQSession<C exten
 
     public abstract void setFlowControl(final boolean active);
 
+    Object getDispatcherLock()
+    {
+        Dispatcher dispatcher = _dispatcher;
+        return dispatcher == null ? null : dispatcher._lock;
+    }
+
     public interface Dispatchable
     {
         void dispatch(AMQSession ssn);
@@ -3389,10 +3391,18 @@ public abstract class AMQSession<C exten
 
             try
             {
-                Dispatchable disp;
-                while (((disp = (Dispatchable) _queue.take()) != null) && !_closed.get())
+
+                while (((_queue.blockingPeek()) != null) && !_closed.get())
                 {
-                    disp.dispatch(AMQSession.this);
+                    synchronized (_lock)
+                    {
+                        Dispatchable disp = _queue.nonBlockingTake();
+
+                        if(disp != null)
+                        {
+                            disp.dispatch(AMQSession.this);
+                        }
+                    }
                 }
             }
             catch (InterruptedException e)

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1659605&r1=1659604&r2=1659605&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Fri Feb 13 17:01:59 2015
@@ -235,7 +235,7 @@ public class AMQSession_0_10 extends AMQ
 
     void failoverPrep()
     {
-        super.failoverPrep();
+        syncDispatchQueue(true);
         clearUnacked();
     }
 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1659605&r1=1659604&r2=1659605&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Fri Feb 13 17:01:59 2015
@@ -412,7 +412,7 @@ public class BasicMessageConsumer_0_10 e
                                                 _capacity,
                                                 Option.UNRELIABLE);
             }
-            _0_10session.syncDispatchQueue();
+            _0_10session.syncDispatchQueue(false);
             o = super.getMessageFromQueue(-1);
         }
         if (_capacity == 0)

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java?rev=1659605&r1=1659604&r2=1659605&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
Fri Feb 13 17:01:59 2015
@@ -22,13 +22,16 @@ package org.apache.qpid.client;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public final class ChannelToSessionMap
 {
-    private final Map<Integer, AMQSession> _sessionMap = new ConcurrentHashMap<>();
+    private final Map<Integer, AMQSession> _sessionMap = Collections.synchronizedMap(new
LinkedHashMap<Integer, AMQSession>());
     private AtomicInteger _idFactory = new AtomicInteger(0);
     private int _maxChannelID;
     private int _minChannelID;
@@ -48,7 +51,7 @@ public final class ChannelToSessionMap
         _sessionMap.remove(channelId);
     }
 
-    public Collection<AMQSession> values()
+    public List<AMQSession> values()
     {
         return new ArrayList<>(_sessionMap.values());
     }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?rev=1659605&r1=1659604&r2=1659605&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
Fri Feb 13 17:01:59 2015
@@ -20,13 +20,13 @@
  */
 package org.apache.qpid.client.util;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.Iterator;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * A blocking queue that emits events above a user specified threshold allowing the caller
to take action (e.g. flow
  * control) to try to prevent the queue growing (much) further. The underlying queue itself
is not bounded therefore the
@@ -37,12 +37,12 @@ import java.util.concurrent.ConcurrentLi
  * <p>
  * TODO  Make this implement java.util.Queue and hide the implementation. Then different
queue types can be substituted.
  */
-public class FlowControllingBlockingQueue
+public class FlowControllingBlockingQueue<T>
 {
 	private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class);
 	
     /** This queue is bounded and is used to store messages before being dispatched to the
consumer */
-    private final Queue _queue = new ConcurrentLinkedQueue();
+    private final Queue<T> _queue = new ConcurrentLinkedQueue<T>();
 
     private final int _flowControlHighThreshold;
     private final int _flowControlLowThreshold;
@@ -82,9 +82,44 @@ public class FlowControllingBlockingQueu
         }
     }
 
-    public Object take() throws InterruptedException
+    public T blockingPeek() throws InterruptedException
+    {
+        T o = _queue.peek();
+        if (o == null)
+        {
+            synchronized (this)
+            {
+                while ((o = _queue.peek()) == null)
+                {
+                    wait();
+                }
+            }
+        }
+        return o;
+    }
+
+    public T nonBlockingTake() throws InterruptedException
+    {
+        T o = _queue.poll();
+
+        if (o != null && !disableFlowControl && _listener != null)
+        {
+            synchronized (_listener)
+            {
+                if (_count-- == _flowControlLowThreshold)
+                {
+                    _listener.underThreshold(_count);
+                }
+            }
+
+        }
+
+        return o;
+    }
+
+    public T take() throws InterruptedException
     {
-        Object o = _queue.poll();
+        T o = _queue.poll();
         if(o == null)
         {
             synchronized(this)
@@ -110,7 +145,7 @@ public class FlowControllingBlockingQueu
         return o;
     }
 
-    public void add(Object o)
+    public void add(T o)
     {
         synchronized(this)
         {
@@ -130,7 +165,7 @@ public class FlowControllingBlockingQueu
         }
     }
 
-    public Iterator iterator()
+    public Iterator<T> iterator()
     {
         return _queue.iterator();
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message