activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timothy Bish <tabish...@gmail.com>
Subject Re: consumer.prefetchSize option on destination is not working
Date Thu, 17 Aug 2017 14:05:17 GMT
On 08/17/2017 07:53 AM, nayanateja9 wrote:
> Hi All,
>
> I am trying to use connsumer.prefetchSize on destination, I want to limit
> the no of messages sent from broker to consumer on destination basis, but it
> is not working for me, but when is set "<policyEntry queue="&gt;"
> queuePrefetch="2" >" on activemq.xml this is working properly, Please find
> the below program I am not acknowledging the messages in CLIENT_ACKNOWLEDGE
> , in this case I am receiving more than 2 messages from TEST.QUEUE5 (the
> queue has 3 messages), but the below program works fine when (queueprefetch
> applied on broker level). Please help
>
> import javax.jms.Message;
> import javax.jms.MessageListener;
>
> import org.apache.activemq.ActiveMQConnection;
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.ActiveMQMessageConsumer;
> import org.apache.activemq.ActiveMQPrefetchPolicy;
> import org.apache.activemq.ActiveMQSession;
> import org.apache.activemq.command.ActiveMQQueue;
>
>
> public class Test implements MessageListener{
> 	
> 	public static void main(String s[]) throws Exception
> 	{
> 		ActiveMQConnection connection =null;
> 		ActiveMQSession session =null;
> 		
> 		try
> 		{
> 			Test test = new Test();
> 			ActiveMQConnectionFactory factory = new
> ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
> 			connection = (ActiveMQConnection) factory.createConnection();
> 			connection.start();
> 			session = (ActiveMQSession)
> connection.createSession(false,ActiveMQSession.CLIENT_ACKNOWLEDGE);
> 			ActiveMQQueue queue = new
> ActiveMQQueue("TEST.QUEUE5?consumer.prefetchSize=2");
> 			ActiveMQMessageConsumer acmgConsume = (ActiveMQMessageConsumer)
> session.createConsumer(queue);
> 			System.out.println(acmgConsume.getPrefetchNumber());
> 			//System.out.println(session.isAsyncDispatch());
> 			acmgConsume.setMessageListener(test);
> 			
> 			//System.out.println(acmgConsume.receive());
> 			//System.out.println(acmgConsume.receive());
> 			//System.out.println(acmgConsume.receive());
> 			
> 			//System.out.println(session.isAsyncDispatch());
> 			//while(true);
> 		}catch(Exception e)
> 		{
> 			e.printStackTrace();
> 		} finally
> 		{
> 			session.close();
> 			connection.stop();
> 			connection.close();
> 		}
> 		
> 	}
>
> 	@Override
> 	public void onMessage(Message arg0) {
> 		// TODO Auto-generated method stub
> 		
> 		System.out.println(arg0);
> 	}
>
> }
>
> Output:
>
> 2
> ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId =
> ID:DA15060505-63172-1502968777734-3:1:1:1:1, originalDestination = null,
> originalTransactionId = null, producerId =
> ID:DA15060505-63172-1502968777734-3:1:1:1, destination =
> queue://TEST.QUEUE5, transactionId = null, expiration = 0, timestamp =
> 1502969052352, arrival = 0, brokerInTime = 1502969052355, brokerOutTime =
> 1502970437587, correlationId = , replyTo = null, persistent = false, type =
> , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null,
> compressed = false, userID = null, content = null, marshalledProperties =
> org.apache.activemq.util.ByteSequence@282ba1e, dataStructure = null,
> redeliveryCounter = 10, size = 0, properties = {JMSXMessageCounter=1},
> readOnlyProperties = true, readOnlyBody = true, droppable = false, text =
> Test Teja}
>
> ActiveMQTextMessage {commandId = 6, responseRequired = false, messageId =
> ID:DA15060505-63172-1502968777734-3:1:1:1:2, originalDestination = null,
> originalTransactionId = null, producerId =
> ID:DA15060505-63172-1502968777734-3:1:1:1, destination =
> queue://TEST.QUEUE5, transactionId = null, expiration = 0, timestamp =
> 1502969052352, arrival = 0, brokerInTime = 1502969052356, brokerOutTime =
> 1502970437587, correlationId = , replyTo = null, persistent = false, type =
> , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null,
> compressed = false, userID = null, content = null, marshalledProperties =
> org.apache.activemq.util.ByteSequence@13b6d03, dataStructure = null,
> redeliveryCounter = 10, size = 0, properties = {JMSXMessageCounter=2},
> readOnlyProperties = true, readOnlyBody = true, droppable = false, text =
> Test Teja}
>
> ActiveMQTextMessage {commandId = 7, responseRequired = false, messageId =
> ID:DA15060505-63172-1502968777734-3:1:1:1:3, originalDestination = null,
> originalTransactionId = null, producerId =
> ID:DA15060505-63172-1502968777734-3:1:1:1, destination =
> queue://TEST.QUEUE5, transactionId = null, expiration = 0, timestamp =
> 1502969052352, arrival = 0, brokerInTime = 1502969052358, brokerOutTime =
> 1502970437601, correlationId = , replyTo = null, persistent = false, type =
> , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null,
> compressed = false, userID = null, content = null, marshalledProperties =
> org.apache.activemq.util.ByteSequence@f5f2bb7, dataStructure = null,
> redeliveryCounter = 10, size = 0, properties = {JMSXMessageCounter=3},
> readOnlyProperties = true, readOnlyBody = true, droppable = false, text =
> Test Teja}
>
>
>
>
>
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/consumer-prefetchSize-option-on-destination-is-not-working-tp4729727.html
> Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
> .
>
Support questions should only be directed to the users list, the dev 
list is where ActiveMQ development discussions are held.


-- 
Tim Bish


Mime
View raw message