qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Keith Wall (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (QPID-8210) [Broker-J] Set queue consumer node before adding consumer into queue consumer manager
Date Tue, 19 Jun 2018 18:01:00 GMT

    [ https://issues.apache.org/jira/browse/QPID-8210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517377#comment-16517377
] 

Keith Wall edited comment on QPID-8210 at 6/19/18 6:00 PM:
-----------------------------------------------------------

Change looks reasonable to me, but I notice that {{org.apache.qpid.server.queue.QueueConsumerImpl#_queueConsumerNode}} is
unsafely published.   It is written by the config thread then observed from the IO threads
without a memory barrier.  Unfortunately, I think it needs to be volatile (or the code redesigned
to avoid the cycle between {{QueueConsumerImpl}} and the node, which is more than will be
desired for 7.0.6).

 Also the last paragraphs on this Jira's description have been managed.  The Jira title
should also be updated to reflect the problem, rather than the solution.


was (Author: k-wall):
Change looks reasonable to me, but I notice that {{org.apache.qpid.server.queue.QueueConsumerImpl#_queueConsumerNode}} is
unsafely published.   It is written by the config thread then observed from the IO threads
without a memory barrier.  Unfortunately, I think it needs to be volatile (or the code redesigned
to avoid the cycle between {{QueueConsumerImpl}} and the node, which is more than will be
desired for 7.0.6).

 Also the last paragraphs on this Jira's description have been managed.

> [Broker-J] Set queue consumer node before adding consumer into queue consumer manager
> -------------------------------------------------------------------------------------
>
>                 Key: QPID-8210
>                 URL: https://issues.apache.org/jira/browse/QPID-8210
>             Project: Qpid
>          Issue Type: Bug
>          Components: Broker-J
>    Affects Versions: qpid-java-broker-7.0.3, qpid-java-broker-7.0.2, qpid-java-broker-7.0.0,
qpid-java-broker-7.0.1, qpid-java-broker-7.0.4, qpid-java-broker-7.0.5
>            Reporter: Alex Rudyy
>            Priority: Critical
>             Fix For: qpid-java-broker-7.0.6
>
>
> The consumer node is currently set after consumer is added into queue consumer manager.
When released message is requeued all consumers in consumer manager are iterated and AbstractQueue#updateSubRequeueEntry
is invoke for every of them to update the released entry in consumer queue context if required.
The notification of consumer without a consumer node can result in NullPointerException:
> {noformat}
> java.lang.NullPointerException
>     at org.apache.qpid.server.queue.QueueConsumerManagerImpl.setNotified(QueueConsumerManagerImpl.java:151)
>     at org.apache.qpid.server.queue.AbstractQueue.notifyConsumer(AbstractQueue.java:2268)
>     at org.apache.qpid.server.queue.AbstractQueue.updateSubRequeueEntry(AbstractQueue.java:1382)
>     at org.apache.qpid.server.queue.AbstractQueue.resetSubPointers(AbstractQueue.java:1413)
>     at org.apache.qpid.server.queue.AbstractQueue.requeue(AbstractQueue.java:1399)
>     at org.apache.qpid.server.queue.QueueEntryImpl.postRelease(QueueEntryImpl.java:446)
>     at org.apache.qpid.server.queue.QueueEntryImpl.release(QueueEntryImpl.java:437)
>     at org.apache.qpid.server.protocol.v0_8.AMQChannel.requeue(AMQChannel.java:896)
>     at org.apache.qpid.server.protocol.v0_8.AMQChannel.close(AMQChannel.java:787)
>     at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl.closeChannel(AMQPConnection_0_8Impl.java:459)
>     at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl.closeChannel(AMQPConnection_0_8Impl.java:430)
>     at org.apache.qpid.server.protocol.v0_8.AMQChannel.receiveChannelClose(AMQChannel.java:2167)
>     at org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody.process(ChannelCloseBody.java:140)
>     at org.apache.qpid.server.protocol.v0_8.ServerDecoder.processMethod(ServerDecoder.java:132)
>     at org.apache.qpid.server.protocol.v0_8.AMQDecoder.processFrame(AMQDecoder.java:203)
>     at org.apache.qpid.server.protocol.v0_8.BrokerDecoder.doProcessFrame(BrokerDecoder.java:141)
>     at org.apache.qpid.server.protocol.v0_8.BrokerDecoder.processFrame(BrokerDecoder.java:65)
>     at org.apache.qpid.server.protocol.v0_8.AMQDecoder.processInput(AMQDecoder.java:185)
>     at org.apache.qpid.server.protocol.v0_8.BrokerDecoder$1.run(BrokerDecoder.java:104)
>     at org.apache.qpid.server.protocol.v0_8.BrokerDecoder$1.run(BrokerDecoder.java:97)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at org.apache.qpid.server.protocol.v0_8.BrokerDecoder.processAMQPFrames(BrokerDecoder.java:96)
>     at org.apache.qpid.server.protocol.v0_8.AMQDecoder.decode(AMQDecoder.java:118)
>     at org.apache.qpid.server.protocol.v0_8.ServerDecoder.decodeBuffer(ServerDecoder.java:44)
>     at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl$1.run(AMQPConnection_0_8Impl.java:257)
>     at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl$1.run(AMQPConnection_0_8Impl.java:249)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl.received(AMQPConnection_0_8Impl.java:248)
>     at org.apache.qpid.server.transport.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:135)
>     at org.apache.qpid.server.transport.NonBlockingConnection.processAmqpData(NonBlockingConnection.java:610)
>     at org.apache.qpid.server.transport.NonBlockingConnectionPlainDelegate.processData(NonBlockingConnectionPlainDelegate.java:58)
>     at org.apache.qpid.server.transport.NonBlockingConnection.doRead(NonBlockingConnection.java:496)
>     at org.apache.qpid.server.transport.NonBlockingConnection.doWork(NonBlockingConnection.java:270)
>     at org.apache.qpid.server.transport.NetworkConnectionScheduler.processConnection(NetworkConnectionScheduler.java:134)
>     at org.apache.qpid.server.transport.SelectorThread$ConnectionProcessor.processConnection(SelectorThread.java:575)
>     at org.apache.qpid.server.transport.SelectorThread$SelectionTask.performSelect(SelectorThread.java:366)
>     at org.apache.qpid.server.transport.SelectorThread$SelectionTask.run(SelectorThread.java:97)
>     at org.apache.qpid.server.transport.SelectorThread.run(SelectorThread.java:533)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at org.apache.qpid.server.bytebuffer.QpidByteBufferFactory.lambda$null$0(QpidByteBufferFactory.java:464)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> The issue can happen in next scenarious:
> * when previously acquired message is released due to consumer/session/connection close
but another consumer is attached to the
> *  well when transaction is rolled back or message is released (for example, JMS session
Session#release() is called  for client_ack session) and a new competing consumer is attached
at the same time.
> queue at the same time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


Mime
View raw message