qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robbie Gemmell <robbie.gemm...@gmail.com>
Subject Re: [Qpid JMS client] Asynchronous send JMS 2.0
Date Fri, 14 Jul 2017 14:31:57 GMT
On 14 July 2017 at 14:49, Timothy Bish <tabish121@gmail.com> wrote:
> On 07/14/2017 09:34 AM, Vavricka wrote:
>>
>> Hi,
>>
>>    qpid jms client version - 0.23.0
>>    qpid c++ broker version - 0.34
>>
>>    I am unable to send messages asynchronously using the JMS 2.0 API.
>>
>>    Connection string is same for JMS 1.1 and JMS 2.0 -
>>
>> "amqp://host:20405?jms.username=admin&jms.password=admin&amqp.traceFrames=true&jms.forceAsyncSend=true&jms.forceAsyncAcks=true"
>>
>>    When I use JMS 1.1 API, it works ok.
>>
>> *Code for JMS 1.1 below.*
>>
>> try
>> {
>>      Topic queue = (Topic) initialContext.lookup("node");
>>      Connection connection = ((ConnectionFactory)
>> initialContext.lookup("connection")).createConnection();
>>      Session session = connection.createSession(false,
>> Session.CLIENT_ACKNOWLEDGE);
>>      MessageProducer producer = session.createProducer(queue);
>>      connection.start();
>>      for (int i = 0; i < 500; i++)
>>      {
>>          producer.send(session.createTextMessage("test"));
>>      }
>> }
>> catch
>> ...
>>
>>   However when I use JMS 2.0 API, sending is extremely slow (1 message per
>> second).
>>
>> *Code for JMS 2.0 API below.*
>>
>> try (JMSContext context =
>> connectionFactory.createContext(JMSContext.CLIENT_ACKNOWLEDGE))
>> {
>>      JMSProducer producer = context.createProducer();
>>      producer.setAsync(new SendCompletitionListener());
>>      for (int i = 0; i < senderOptionParser.getMessageCountValue(); i++)
>>      {
>>          producer.send(queue, context.createTextMessage("test"));
>>      }
>> }
>> catch
>> ...
>>
>> *Part of trace output of JMS 1.1 during one message sent:*
>>
>> {Output before producer.send(session.createTextMessage("test"));}
>> ...
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpFixedProducer - Producer sending
>> message: JmsOutboundMessageDispatch {dispatchId =
>> ID:dfbce852-0cdc-4669-99df-e9d59569b0f6:1:1:1-1, MessageID =
>> ID:dfbce852-0cdc-4669-99df-e9d59569b0f6:1:1:1-1 }
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Transfer{handle=0,
>> deliveryId=0, deliveryTag=0, messageFormat=0, settled=null, more=false,
>> rcvSettleMode=null, state=null, resume=false, aborted=false,
>> batchable=false}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write
>> of:
>> 2283 bytes
>> ...
>> {Output after producer.send(session.createTextMessage("test"));}
>>
>> *Part of trace output of JMS 2.0 during one message sent:*
>>
>> {Output before producer.send(queue, context.createTextMessage("test"));}
>> ...
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - SENT:
>>
>> Attach{name='qpid-jms:sender:ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1:broadcast',
>> handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
>> source=Source{address='ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1',
>> durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false,
>> dynamicNodeProperties=null, distributionMode=null, filter=null,
>> defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list,
>> amqp:released:list, amqp:modified:list], capabilities=null},
>> target=Target{address='broadcast', durable=NONE, expiryPolicy=SESSION_END,
>> timeout=0, dynamic=false, dynamicNodeProperties=null,
>> capabilities=[topic]},
>> unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0,
>> maxMessageSize=null, offeredCapabilities=null,
>> desiredCapabilities=[DELAYED_DELIVERY], properties=null}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write
>> of:
>> 324 bytes
>> [epollEventLoopGroup-2-1] TRACE
>> org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read:
>> 212
>> bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 212, cap: 65536)
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - RECV:
>>
>> Attach{name='qpid-jms:sender:ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1:broadcast',
>> handle=0, role=RECEIVER, sndSettleMode=MIXED, rcvSettleMode=FIRST,
>> source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END,
>> timeout=0, dynamic=false, dynamicNodeProperties=null,
>> distributionMode=null,
>> filter=null, defaultOutcome=null, outcomes=null, capabilities=null},
>> target=Target{address='broadcast', durable=NONE, expiryPolicy=SESSION_END,
>> timeout=0, dynamic=false, dynamicNodeProperties=null,
>> capabilities=[topic]},
>> unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0,
>> maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null,
>> properties=null}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Flow{nextIncomingId=2,
>> incomingWindow=2147483647, nextOutgoingId=0, outgoingWindow=2147483647,
>> handle=0, deliveryCount=0, linkCredit=500, available=null, drain=false,
>> echo=false, properties=null}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
>> LINK_INIT
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
>> LINK_LOCAL_OPEN
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
>> LINK_REMOTE_OPEN
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Open
>> phase
>> of anonymous send complete: ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpFixedProducer - Producer sending
>> message: JmsOutboundMessageDispatch {dispatchId =
>> ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1-2, MessageID =
>> ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1-2 }
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Transfer{handle=0,
>> deliveryId=1, deliveryTag=0, messageFormat=0, settled=null, more=false,
>> rcvSettleMode=null, state=null, resume=false, aborted=false,
>> batchable=false}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write
>> of:
>> 2284 bytes
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
>> LINK_FLOW
>> [epollEventLoopGroup-2-1] TRACE
>> org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 45
>> bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 45, cap: 65536)
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Flow{nextIncomingId=3,
>> incomingWindow=2147483647, nextOutgoingId=0, outgoingWindow=2147483647,
>> handle=0, deliveryCount=1, linkCredit=500, available=null, drain=false,
>> echo=false, properties=null}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
>> LINK_FLOW
>> [epollEventLoopGroup-2-1] TRACE
>> org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 38
>> bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 38, cap: 65536)
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - RECV:
>> Disposition{role=RECEIVER,
>> first=1, last=1, settled=true, state=Accepted{}, batchable=false}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
>> DELIVERY
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpFixedProducer - Outcome of delivery
>> was accepted: DeliveryImpl [_tag=[48],
>> _link=org.apache.qpid.proton.engine.impl.SenderImpl@67de5132,
>> _deliveryState=null, _settled=false, _remoteSettled=true,
>> _remoteDeliveryState=Accepted{}, _flags=0, _defaultDeliveryState=null,
>>
>> _transportDelivery=org.apache.qpid.proton.engine.impl.TransportDelivery@1303044f,
>> _dataSize=0, _complete=true, _updated=true, _done=true, _offset=0]
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Send
>> phase
>> of anonymous send complete: ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpAbstractResource - AmqpFixedProducer
>> {
>> ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1 } requesting close on
>> remote.
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
>> LINK_LOCAL_CLOSE
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Detach{handle=0,
>> closed=true, error=null}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write
>> of:
>> 16 bytes
>> [epollEventLoopGroup-2-1] TRACE
>> org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 24
>> bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 24, cap: 65536)
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Detach{handle=0,
>> closed=true, error=null}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
>> LINK_REMOTE_CLOSE
>> [AmqpProvider:(1):[amqp://host:20405]] DEBUG
>> org.apache.qpid.jms.provider.amqp.AmqpAbstractResource - AmqpFixedProducer
>> {
>> ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1 } is now closed:
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Close
>> phase of anonymous send complete:
>> ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
>> LINK_FINAL
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Started
>> send chain for anonymous producer:
>> ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
>> [AmqpProvider:(1):[amqp://host:20405]] DEBUG
>> org.apache.qpid.jms.provider.amqp.builders.AmqpProducerBuilder - Creating
>> AmqpFixedProducer for: broadcast
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.provider.amqp.FRAMES - SENT:
>>
>> Attach{name='qpid-jms:sender:ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:2:broadcast',
>> handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
>> source=Source{address='ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:2',
>> durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false,
>> dynamicNodeProperties=null, distributionMode=null, filter=null,
>> defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list,
>> amqp:released:list, amqp:modified:list], capabilities=null},
>> target=Target{address='broadcast', durable=NONE, expiryPolicy=SESSION_END,
>> timeout=0, dynamic=false, dynamicNodeProperties=null,
>> capabilities=[topic]},
>> unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0,
>> maxMessageSize=null, offeredCapabilities=null,
>> desiredCapabilities=[DELAYED_DELIVERY], properties=null}
>> [AmqpProvider:(1):[amqp://host:20405]] TRACE
>> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write
>> of:
>> 324 bytes
>> ...
>> {Output after producer.send(queue, context.createTextMessage("test"));}
>>
>> It seems to me that in JMS 2.0 api producer closes link after each send of
>> message and then opens a new one when producer wants to send another
>> message.
>>
>> Am I doing something wrong with JMS 2.0 API?
>>
>> Tomas
>
>
> You are using it correctly although not necessarily comparing the two in a
> fair manner.  In your JMS 1.1 sample you are creating a producer with a
> target destination so that producer will send to that destination on each
> call send.  However in the JMS 2.0 sample you must create an anonymous
> producer as that's how the 2.0 JMS Context API is designed so if you really
> want to compare the two correctly you should use a JMS 1.1 anonymous
> producer by calling createProducer(null) and then sending to the target
> destination via producer.send(destination, message).
>
> The reason that the client is creating a new link on each send is that the
> message broker in question doesn't support anonymous relay links (at least
> it hasn't advertised to the client that it does) so the client must resort
> to creating a new link each time you perform a send using the address of the
> destination you've chosen to send the message to.  I'd guess your
> performance numbers will be much more comparable if you employ a JMS 1.1
> anonymous producer in your testing.
>

That would be the direct comparison but I'd guess this would make them
both go equally slow :)

As slow as the fallback behaviour of opening and closing a link
per-message might be, necessary to support the anonymous producers
against the older broker, I expect it is still the C++ brokers journal
sync bound acceptance of the persistent message that makes things go
quite as slow as is being seen with the JMSProducer. Only reducing the
brokers journal sync time can make that bit faster.

An alternative suggestion against the older broker would be to
continue to create a 'classic API' JMS MessageProducer (rather than
'simplified API' JMSProducer) to the fixed destination, and use its
new JMS 2.0 send methods to do reliable async sends. That would avoid
the need to open new links for the sends. The message acceptance will
still be bound by the journal sync, but you could then do multiple
sends at a time reliably.

>>
>>
>> --
>> View this message in context:
>> http://qpid.2158936.n2.nabble.com/Qpid-JMS-client-Asynchronous-send-JMS-2-0-tp7664785.html
>> Sent from the Apache Qpid users mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
>> For additional commands, e-mail: users-help@qpid.apache.org
>>
>>
>
> --
> Tim Bish
> twitter: @tabish121
> blog: http://timbish.blogspot.com/
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Mime
View raw message