Author: kwall
Date: Thu Feb 12 15:18:16 2015
New Revision: 1659288
URL: http://svn.apache.org/r1659288
Log:
0-10 queue browser fix.
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1659288&r1=1659287&r2=1659288&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
Thu Feb 12 15:18:16 2015
@@ -316,6 +316,7 @@ class QueueConsumerImpl
public final void flush()
{
_queue.flushConsumer(this);
+ _target.processPending();
}
public boolean resend(final QueueEntry entry)
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1659288&r1=1659287&r2=1659288&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
Thu Feb 12 15:18:16 2015
@@ -286,7 +286,16 @@ public class SelectorThread extends Thre
@Override
public void run()
{
- processConnection(connection);
+ String currentName = Thread.currentThread().getName();
+ try
+ {
+ Thread.currentThread().setName("NCS-"+connection.getRemoteAddress().toString());
+ processConnection(connection);
+ }
+ finally
+ {
+ Thread.currentThread().setName(currentName);
+ }
}
});
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1659288&r1=1659287&r2=1659288&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
Thu Feb 12 15:18:16 2015
@@ -61,7 +61,8 @@ public class ProtocolEngine_0_10 extend
private long _lastWriteTime = _createTime;
private volatile boolean _transportBlockedForWriting;
- private volatile boolean _messageAssignmentSuspended;
+ private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>();
+
private final AtomicBoolean _stateChanged = new AtomicBoolean();
private final AtomicReference<Action<ServerProtocolEngine>> _workListener
= new AtomicReference<>();
@@ -81,13 +82,15 @@ public class ProtocolEngine_0_10 extend
@Override
public boolean isMessageAssignmentSuspended()
{
- return _messageAssignmentSuspended;
+ Thread lock = _messageAssignmentSuspended.get();
+ return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread();
}
@Override
public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
{
- _messageAssignmentSuspended = messageAssignmentSuspended;
+ _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread()
: null);
+
if(!messageAssignmentSuspended)
{
for(AMQSessionModel<?,?> session : _connection.getSessionModels())
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1659288&r1=1659287&r2=1659288&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
Thu Feb 12 15:18:16 2015
@@ -209,19 +209,20 @@ public class AMQProtocolEngine implement
private long _maxMessageSize;
private volatile boolean _transportBlockedForWriting;
- private volatile boolean _messageAssignmentSuspended;
+ private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>();
@Override
public boolean isMessageAssignmentSuspended()
{
- return _messageAssignmentSuspended;
+ Thread lock = _messageAssignmentSuspended.get();
+ return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread();
}
@Override
public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
{
- _messageAssignmentSuspended = messageAssignmentSuspended;
+ _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread()
: null);
if(!messageAssignmentSuspended)
{
for(AMQSessionModel<?,?> session : getSessionModels())
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java?rev=1659288&r1=1659287&r2=1659288&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
Thu Feb 12 15:18:16 2015
@@ -144,7 +144,7 @@ public class ProtocolEngine_1_0_0_SASL i
private State _state = State.A;
- private volatile boolean _messageAssignmentSuspended;
+ private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>();
@@ -166,13 +166,14 @@ public class ProtocolEngine_1_0_0_SASL i
@Override
public boolean isMessageAssignmentSuspended()
{
- return _messageAssignmentSuspended;
+ Thread lock = _messageAssignmentSuspended.get();
+ return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread();
}
@Override
public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
{
- _messageAssignmentSuspended = messageAssignmentSuspended;
+ _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread()
: null);
if(!messageAssignmentSuspended)
{
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java?rev=1659288&r1=1659287&r2=1659288&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
Thu Feb 12 15:18:16 2015
@@ -290,7 +290,8 @@ public class SSLTest extends QpidBrokerT
ByteArrayOutputStream bout = new ByteArrayOutputStream();
e.printStackTrace(new PrintStream(bout));
String strace = bout.toString();
- assertTrue("Correct exception not thrown", strace.contains(expectedString));
+ assertTrue("Correct exception not thrown, expecting : " + expectedString + " got
: " +e,
+ strace.contains(expectedString));
}
public void testVerifyLocalHost() throws Exception
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java?rev=1659288&r1=1659287&r2=1659288&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
Thu Feb 12 15:18:16 2015
@@ -147,6 +147,8 @@ public class QueueBrowserAutoAckTest ext
assertEquals("Session reports Queue expectedDepth not as expected", expectedDepth,
queueDepth);
+ getLogger().debug("KWDEBUG : About to check queue depth using browser");
+
// Browse the queue to get a second opinion
int msgCount = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
|