qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timothy Bish <tabish...@gmail.com>
Subject Re: [Qpid JMS client] Asynchronous send JMS 2.0
Date Fri, 14 Jul 2017 13:49:53 GMT
On 07/14/2017 09:34 AM, Vavricka wrote:
> Hi,
>
>    qpid jms client version - 0.23.0
>    qpid c++ broker version - 0.34
>
>    I am unable to send messages asynchronously using the JMS 2.0 API.
>
>    Connection string is same for JMS 1.1 and JMS 2.0 -
> "amqp://host:20405?jms.username=admin&jms.password=admin&amqp.traceFrames=true&jms.forceAsyncSend=true&jms.forceAsyncAcks=true"
>
>    When I use JMS 1.1 API, it works ok.
>
> *Code for JMS 1.1 below.*
>
> try
> {
>      Topic queue = (Topic) initialContext.lookup("node");
>      Connection connection = ((ConnectionFactory)
> initialContext.lookup("connection")).createConnection();
>      Session session = connection.createSession(false,
> Session.CLIENT_ACKNOWLEDGE);
>      MessageProducer producer = session.createProducer(queue);
>      connection.start();
>      for (int i = 0; i < 500; i++)
>      {
>          producer.send(session.createTextMessage("test"));
>      }
> }
> catch
> ...
>
>   However when I use JMS 2.0 API, sending is extremely slow (1 message per
> second).
>
> *Code for JMS 2.0 API below.*
>
> try (JMSContext context =
> connectionFactory.createContext(JMSContext.CLIENT_ACKNOWLEDGE))
> {
>      JMSProducer producer = context.createProducer();
>      producer.setAsync(new SendCompletitionListener());
>      for (int i = 0; i < senderOptionParser.getMessageCountValue(); i++)
>      {
>          producer.send(queue, context.createTextMessage("test"));
>      }
> }
> catch
> ...
>
> *Part of trace output of JMS 1.1 during one message sent:*
>
> {Output before producer.send(session.createTextMessage("test"));}
> ...
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpFixedProducer - Producer sending
> message: JmsOutboundMessageDispatch {dispatchId =
> ID:dfbce852-0cdc-4669-99df-e9d59569b0f6:1:1:1-1, MessageID =
> ID:dfbce852-0cdc-4669-99df-e9d59569b0f6:1:1:1-1 }
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Transfer{handle=0,
> deliveryId=0, deliveryTag=0, messageFormat=0, settled=null, more=false,
> rcvSettleMode=null, state=null, resume=false, aborted=false,
> batchable=false}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of:
> 2283 bytes
> ...
> {Output after producer.send(session.createTextMessage("test"));}
>
> *Part of trace output of JMS 2.0 during one message sent:*
>
> {Output before producer.send(queue, context.createTextMessage("test"));}
> ...
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - SENT:
> Attach{name='qpid-jms:sender:ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1:broadcast',
> handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
> source=Source{address='ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1',
> durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false,
> dynamicNodeProperties=null, distributionMode=null, filter=null,
> defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list,
> amqp:released:list, amqp:modified:list], capabilities=null},
> target=Target{address='broadcast', durable=NONE, expiryPolicy=SESSION_END,
> timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=[topic]},
> unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0,
> maxMessageSize=null, offeredCapabilities=null,
> desiredCapabilities=[DELAYED_DELIVERY], properties=null}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of:
> 324 bytes
> [epollEventLoopGroup-2-1] TRACE
> org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 212
> bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 212, cap: 65536)
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - RECV:
> Attach{name='qpid-jms:sender:ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1:broadcast',
> handle=0, role=RECEIVER, sndSettleMode=MIXED, rcvSettleMode=FIRST,
> source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END,
> timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null,
> filter=null, defaultOutcome=null, outcomes=null, capabilities=null},
> target=Target{address='broadcast', durable=NONE, expiryPolicy=SESSION_END,
> timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=[topic]},
> unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0,
> maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null,
> properties=null}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Flow{nextIncomingId=2,
> incomingWindow=2147483647, nextOutgoingId=0, outgoingWindow=2147483647,
> handle=0, deliveryCount=0, linkCredit=500, available=null, drain=false,
> echo=false, properties=null}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: LINK_INIT
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
> LINK_LOCAL_OPEN
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
> LINK_REMOTE_OPEN
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Open phase
> of anonymous send complete: ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpFixedProducer - Producer sending
> message: JmsOutboundMessageDispatch {dispatchId =
> ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1-2, MessageID =
> ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1-2 }
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Transfer{handle=0,
> deliveryId=1, deliveryTag=0, messageFormat=0, settled=null, more=false,
> rcvSettleMode=null, state=null, resume=false, aborted=false,
> batchable=false}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of:
> 2284 bytes
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: LINK_FLOW
> [epollEventLoopGroup-2-1] TRACE
> org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 45
> bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 45, cap: 65536)
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Flow{nextIncomingId=3,
> incomingWindow=2147483647, nextOutgoingId=0, outgoingWindow=2147483647,
> handle=0, deliveryCount=1, linkCredit=500, available=null, drain=false,
> echo=false, properties=null}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: LINK_FLOW
> [epollEventLoopGroup-2-1] TRACE
> org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 38
> bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 38, cap: 65536)
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Disposition{role=RECEIVER,
> first=1, last=1, settled=true, state=Accepted{}, batchable=false}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event: DELIVERY
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpFixedProducer - Outcome of delivery
> was accepted: DeliveryImpl [_tag=[48],
> _link=org.apache.qpid.proton.engine.impl.SenderImpl@67de5132,
> _deliveryState=null, _settled=false, _remoteSettled=true,
> _remoteDeliveryState=Accepted{}, _flags=0, _defaultDeliveryState=null,
> _transportDelivery=org.apache.qpid.proton.engine.impl.TransportDelivery@1303044f,
> _dataSize=0, _complete=true, _updated=true, _done=true, _offset=0]
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Send phase
> of anonymous send complete: ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpAbstractResource - AmqpFixedProducer {
> ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1 } requesting close on remote.
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
> LINK_LOCAL_CLOSE
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - SENT: Detach{handle=0,
> closed=true, error=null}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of:
> 16 bytes
> [epollEventLoopGroup-2-1] TRACE
> org.apache.qpid.jms.transports.netty.NettyTcpTransport - New data read: 24
> bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 24, cap: 65536)
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - RECV: Detach{handle=0,
> closed=true, error=null}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
> LINK_REMOTE_CLOSE
> [AmqpProvider:(1):[amqp://host:20405]] DEBUG
> org.apache.qpid.jms.provider.amqp.AmqpAbstractResource - AmqpFixedProducer {
> ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:1 } is now closed:
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Close
> phase of anonymous send complete:
> ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpProvider - New Proton Event:
> LINK_FINAL
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.AmqpAnonymousFallbackProducer - Started
> send chain for anonymous producer:
> ID:fe9d80f0-ab3f-45e1-bb7e-3950e460b0a3:1:1:1
> [AmqpProvider:(1):[amqp://host:20405]] DEBUG
> org.apache.qpid.jms.provider.amqp.builders.AmqpProducerBuilder - Creating
> AmqpFixedProducer for: broadcast
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.provider.amqp.FRAMES - SENT:
> Attach{name='qpid-jms:sender:ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:2:broadcast',
> handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
> source=Source{address='ID:8261d327-2100-47a1-8860-69bc1d0fd6ae:1:-1:2',
> durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false,
> dynamicNodeProperties=null, distributionMode=null, filter=null,
> defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list,
> amqp:released:list, amqp:modified:list], capabilities=null},
> target=Target{address='broadcast', durable=NONE, expiryPolicy=SESSION_END,
> timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=[topic]},
> unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0,
> maxMessageSize=null, offeredCapabilities=null,
> desiredCapabilities=[DELAYED_DELIVERY], properties=null}
> [AmqpProvider:(1):[amqp://host:20405]] TRACE
> org.apache.qpid.jms.transports.netty.NettyTcpTransport - Attempted write of:
> 324 bytes
> ...
> {Output after producer.send(queue, context.createTextMessage("test"));}
>
> It seems to me that in JMS 2.0 api producer closes link after each send of
> message and then opens a new one when producer wants to send another
> message.
>
> Am I doing something wrong with JMS 2.0 API?
>
> Tomas

You are using it correctly although not necessarily comparing the two in 
a fair manner.  In your JMS 1.1 sample you are creating a producer with 
a target destination so that producer will send to that destination on 
each call send.  However in the JMS 2.0 sample you must create an 
anonymous producer as that's how the 2.0 JMS Context API is designed so 
if you really want to compare the two correctly you should use a JMS 1.1 
anonymous producer by calling createProducer(null) and then sending to 
the target destination via producer.send(destination, message).

The reason that the client is creating a new link on each send is that 
the message broker in question doesn't support anonymous relay links (at 
least it hasn't advertised to the client that it does) so the client 
must resort to creating a new link each time you perform a send using 
the address of the destination you've chosen to send the message to.  
I'd guess your performance numbers will be much more comparable if you 
employ a JMS 1.1 anonymous producer in your testing.

>
>
> --
> View this message in context: http://qpid.2158936.n2.nabble.com/Qpid-JMS-client-Asynchronous-send-JMS-2-0-tp7664785.html
> Sent from the Apache Qpid users mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>
>

-- 
Tim Bish
twitter: @tabish121
blog: http://timbish.blogspot.com/


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Mime
View raw message