qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1226382 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/logging/subjects/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/main/java/org/apache/qpid/server/queue/ broker/src/main/java/org/apache/qpid...
Date Mon, 02 Jan 2012 10:01:22 GMT
Author: rgodfrey
Date: Mon Jan  2 10:01:21 2012
New Revision: 1226382

URL: http://svn.apache.org/viewvc?rev=1226382&view=rev
Log:
QPID-3713 : Implement producer side flow control for 0-10 in Java Broker

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
    qpid/trunk/qpid/java/test-profiles/Java010Excludes

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
Mon Jan  2 10:01:21 2012
@@ -22,6 +22,9 @@ package org.apache.qpid.server.logging.s
 
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.transport.ServerConnection;
+import org.apache.qpid.server.transport.ServerSession;
+
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
 
 public class ChannelLogSubject extends AbstractLogSubject
@@ -52,5 +55,30 @@ public class ChannelLogSubject extends A
                                session.getVirtualHost().getName(),
                                channel.getChannelId());
     }
-    
+
+    public ChannelLogSubject(ServerSession session)
+    {
+        /**
+         * LOG FORMAT used by the AMQPConnectorActor follows
+         * ChannelLogSubject.CHANNEL_FORMAT :
+         * con:{0}({1}@{2}/{3})/ch:{4}
+         *
+         * Uses a MessageFormat call to insert the required values according to
+         * these indices:
+         *
+         * 0 - Connection ID
+         * 1 - User ID
+         * 2 - IP
+         * 3 - Virtualhost
+         * 4 - Channel ID
+         */
+        ServerConnection connection = (ServerConnection) session.getConnection();
+        setLogStringWithFormat(CHANNEL_FORMAT,
+                               connection == null ? -1L : connection.getConnectionId(),
+                               session.getAuthorizedPrincipal() == null ? "?" : session.getAuthorizedPrincipal().getName(),
+                               (connection == null || connection.getConfig() == null) ? "?"
: connection.getConfig().getAddress(),
+                               session.getVirtualHost().getName(),
+                               session.getChannel());
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
Mon Jan  2 10:01:21 2012
@@ -22,6 +22,8 @@ package org.apache.qpid.server.protocol;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
 
 public interface AMQSessionModel
 {
@@ -51,4 +53,8 @@ public interface AMQSessionModel
      * @param idleClose time in milliseconds before closing connection with idle transaction
      */
     public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long
idleClose) throws AMQException;
+
+    void block(AMQQueue queue);
+
+    void unblock(AMQQueue queue);
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon
Jan  2 10:01:21 2012
@@ -217,7 +217,7 @@ public interface AMQQueue extends Managa
 
     Map<String, Object> getArguments();
 
-    void checkCapacity(AMQChannel channel);
+    void checkCapacity(AMQSessionModel channel);
 
     /**
      * ExistingExclusiveSubscription signals a failure to create a subscription, because
an exclusive subscription

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Mon Jan  2 10:01:21 2012
@@ -164,7 +164,7 @@ public class SimpleAMQQueue implements A
     private AtomicInteger _deliveredMessages = new AtomicInteger();
     private AtomicBoolean _stopped = new AtomicBoolean(false);
 
-    private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel,
Boolean>();
+    private final ConcurrentMap<AMQSessionModel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQSessionModel,
Boolean>();
 
     private final AtomicBoolean _deleted = new AtomicBoolean(false);
     private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
@@ -1528,7 +1528,7 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    public void checkCapacity(AMQChannel channel)
+    public void checkCapacity(AMQSessionModel channel)
     {
         if(_capacity != 0l)
         {
@@ -1538,10 +1538,9 @@ public class SimpleAMQQueue implements A
                 //Overfull log message
                 _logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(),
_capacity));
 
-                if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null)
-                {
-                    channel.block(this);
-                }
+                _blockedChannels.putIfAbsent(channel, Boolean.TRUE);
+
+                channel.block(this);
 
                 if(_atomicQueueSize.get() <= _flowResumeCapacity)
                 {
@@ -1573,7 +1572,7 @@ public class SimpleAMQQueue implements A
                 }
 
 
-                for(AMQChannel c : _blockedChannels.keySet())
+                for(AMQSessionModel c : _blockedChannels.keySet())
                 {
                     c.unblock(this);
                     _blockedChannels.remove(c);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
Mon Jan  2 10:01:21 2012
@@ -25,7 +25,6 @@ import static org.apache.qpid.util.Seria
 
 import java.security.Principal;
 import java.text.MessageFormat;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -33,8 +32,11 @@ import java.util.Map;
 import java.util.SortedMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.security.auth.Subject;
 import org.apache.qpid.AMQException;
@@ -45,11 +47,13 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.configuration.ConnectionConfig;
 import org.apache.qpid.server.configuration.SessionConfig;
 import org.apache.qpid.server.configuration.SessionConfigType;
+import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.GenericActor;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -66,6 +70,11 @@ import org.apache.qpid.server.txn.Server
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Binary;
 import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageFlow;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageSetFlowMode;
+import org.apache.qpid.transport.MessageStop;
 import org.apache.qpid.transport.MessageTransfer;
 import org.apache.qpid.transport.Method;
 import org.apache.qpid.transport.Range;
@@ -81,6 +90,7 @@ public class ServerSession extends Sessi
     private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
     
     private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
+    private static final int HALF_INCOMING_CREDIT_THRESHOLD = 1 << 30;
 
     private final UUID _id;
     private ConnectionConfig _connectionConfig;
@@ -88,6 +98,16 @@ public class ServerSession extends Sessi
     private LogActor _actor = GenericActor.getInstance(this);
     private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction();
 
+    private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue,
Boolean>();
+
+    private final ConcurrentMap<Exchange, Boolean> _blockingExchanges = new ConcurrentHashMap<Exchange,
Boolean>();
+
+
+    private final AtomicBoolean _blocking = new AtomicBoolean(false);
+    private ChannelLogSubject _logSubject;
+    private final AtomicInteger _oustandingCredit = new AtomicInteger(Integer.MAX_VALUE);
+
+
     public static interface MessageDispositionChangeListener
     {
         public void onAccept();
@@ -132,7 +152,7 @@ public class ServerSession extends Sessi
         super(connection, delegate, name, expiry);
         _connectionConfig = connConfig;        
         _transaction = new AutoCommitTransaction(this.getMessageStore());
-
+        _logSubject = new ChannelLogSubject(this);
         _id = getConfigStore().createId();
         getConfigStore().addConfiguredObject(this);
     }
@@ -161,6 +181,10 @@ public class ServerSession extends Sessi
 
     public void enqueue(final ServerMessage message, final List<? extends BaseQueue>
queues)
     {
+        if(_oustandingCredit.decrementAndGet() < HALF_INCOMING_CREDIT_THRESHOLD)
+        {
+            invoke(new MessageFlow("",MessageCreditUnit.MESSAGE,HALF_INCOMING_CREDIT_THRESHOLD));
+        }
         getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
         PostEnqueueAction postTransactionAction;
         if(isTransactional())
@@ -661,6 +685,43 @@ public class ServerSession extends Sessi
         }
     }
 
+    public void block(AMQQueue queue)
+    {
+        if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null)
+        {
+
+            if(_blocking.compareAndSet(false,true))
+            {
+                invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT));
+                invoke(new MessageStop(""));
+                _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getNameShortString().toString()));
+            }
+
+
+        }
+    }
+
+    public void unblock(AMQQueue queue)
+    {
+        if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty())
+        {
+            if(_blocking.compareAndSet(true,false))
+            {
+
+                _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+                MessageFlow mf = new MessageFlow();
+                mf.setUnit(MessageCreditUnit.MESSAGE);
+                mf.setDestination("");
+                _oustandingCredit.set(Integer.MAX_VALUE);
+                mf.setValue(Integer.MAX_VALUE);
+                invoke(mf);
+
+
+            }
+        }
+    }
+
+
     public String toLogString()
     {
        return "[" +
@@ -701,7 +762,7 @@ public class ServerSession extends Sessi
         }
     }
 
-    private static class PostEnqueueAction implements ServerTransaction.Action
+    private class PostEnqueueAction implements ServerTransaction.Action
     {
 
         private List<? extends BaseQueue> _queues;
@@ -732,7 +793,13 @@ public class ServerSession extends Sessi
             {
                 try
                 {
-                    _queues.get(i).enqueue(_message, _transactional, null);
+                    BaseQueue queue = _queues.get(i);
+                    queue.enqueue(_message, _transactional, null);
+                    if(queue instanceof AMQQueue)
+                    {
+                        ((AMQQueue)queue).checkCapacity(ServerSession.this);
+                    }
+
                 }
                 catch (AMQException e)
                 {
@@ -756,6 +823,6 @@ public class ServerSession extends Sessi
 
     public boolean getBlocking()
     {
-        return false; //TODO: Blocking not implemented on 0-10 yet.
+        return _blocking.get();
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
(original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
Mon Jan  2 10:01:21 2012
@@ -30,7 +30,6 @@ import org.apache.qpid.server.virtualhos
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -477,7 +476,7 @@ public class MockAMQQueue implements AMQ
         return null;
     }
 
-    public void checkCapacity(AMQChannel channel)
+    public void checkCapacity(AMQSessionModel channel)
     {
     }
 

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java Mon Jan
 2 10:01:21 2012
@@ -185,6 +185,12 @@ public abstract class Range implements R
             }
         }
 
+        public String toString()
+        {
+            return "[" + point + ", " + point + "]";
+        }
+
+
     }
 
     private static class RangeImpl extends Range
@@ -283,7 +289,7 @@ public abstract class Range implements R
             return range;
         }
 
-        @Override
+
         public void remove()
         {
             throw new UnsupportedOperationException();

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Mon Jan
 2 10:01:21 2012
@@ -61,7 +61,7 @@ import java.util.concurrent.atomic.Atomi
 public class Session extends SessionInvoker
 {
     private static final Logger log = Logger.get(Session.class);
-
+    
     public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
 
     static class DefaultSessionListener implements SessionListener
@@ -96,6 +96,9 @@ public class Session extends SessionInvo
     private final long timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
                                         Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
                                                      ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
+    private final long blockedSendTimeout = Long.getLong("qpid.flow_control_wait_failure",
timeout);
+    private long blockedSendReportingPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
+
     private boolean autoSync = false;
 
     private boolean incomingInit;
@@ -228,10 +231,21 @@ public class Session extends SessionInvo
         {
             try
             {
-                if (!credit.tryAcquire(timeout, TimeUnit.MILLISECONDS))
+                long wait = blockedSendTimeout > blockedSendReportingPeriod ? blockedSendReportingPeriod
:
+                           blockedSendTimeout;
+                long totalWait = 1L;
+                while(totalWait <= blockedSendTimeout && !credit.tryAcquire(wait,
TimeUnit.MILLISECONDS))
+                {
+                    totalWait+=wait;
+                    log.warn("Message send delayed by " + (totalWait)/1000 + "s due to broker
enforced flow control");
+
+
+                }
+                if(totalWait > blockedSendTimeout)
                 {
+                    log.error("Message send failed due to timeout waiting on broker enforced
flow control");
                     throw new SessionException
-                        ("timed out waiting for message credit");
+                            ("timed out waiting for message credit");
                 }
             }
             catch (InterruptedException e)
@@ -815,7 +829,7 @@ public class Session extends SessionInvo
             while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
             {
                 checkFailoverRequired("Session sync was interrupted by failover.");
-                log.debug("%s   waiting for[%d]: %d, %s", this, point, maxComplete, commands);
+                log.debug("%s   waiting for[%d]: %d, %s", this, point, maxComplete, Arrays.asList(commands));
                 w.await();
             }
 

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
Mon Jan  2 10:01:21 2012
@@ -154,8 +154,7 @@ public class ProducerFlowControlTest ext
         // try to send 5 messages (should block after 4)
         sendMessagesAsync(producer, producerSession, 5, 50L);
 
-        Thread.sleep(5000);
-        List<String> results = waitAndFindMatches("QUE-1003");
+        List<String> results = waitAndFindMatches("QUE-1003", 7000);
 
         assertEquals("Did not find correct number of QUE-1003 queue overfull messages", 1,
results.size());
 
@@ -199,11 +198,13 @@ public class ProducerFlowControlTest ext
         // try to send 5 messages (should block after 4)
         MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L);
 
-        Thread.sleep(TIMEOUT);
         List<String> results = waitAndFindMatches("Message send delayed by", TIMEOUT);
         assertTrue("No delay messages logged by client",results.size()!=0);
-        results = findMatches("Message send failed due to timeout waiting on broker enforced
flow control");
-        assertEquals("Incorrect number of send failure messages logged by client",1,results.size());
+
+        List<String> failedMessages = waitAndFindMatches("Message send failed due to
timeout waiting on broker enforced"
+                                                  + " flow control", TIMEOUT);
+        assertEquals("Incorrect number of send failure messages logged by client (got " +
results.size() + " delay "
+                     + "messages)",1,failedMessages.size());
 
 
 
@@ -325,8 +326,9 @@ public class ProducerFlowControlTest ext
 
 
         // try to send 5 messages (should block after 4)
-        MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L);
+        MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 100L);
 
+        
         Thread.sleep(10000);
 
         Exception e = sender.getException();
@@ -440,6 +442,15 @@ public class ProducerFlowControlTest ext
                 e.printStackTrace();
                 throw new RuntimeException(e);
             }
+
+            try
+            {
+                Thread.sleep(sleepPeriod);
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
         }
     }
 

Modified: qpid/trunk/qpid/java/test-profiles/Java010Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Java010Excludes?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Java010Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Java010Excludes Mon Jan  2 10:01:21 2012
@@ -45,9 +45,6 @@ org.apache.qpid.server.logging.Subscript
 // 0-10 is not supported by the MethodRegistry
 org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*
 
-//QPID-942 : Implemented Channel.Flow based Producer Side flow control to the Java Broker
(not in CPP Broker)
-org.apache.qpid.server.queue.ProducerFlowControlTest#*
-
 //QPID-1864: rollback with subscriptions does not work in 0-10 yet
 org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message