activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michal Singer <Michal.Sin...@expand.com>
Subject Why does dequeue different from enqueue and from dispatch and queue size increases forever
Date Wed, 19 Mar 2008 15:57:36 GMT

Hi.
When i use spring to create a connection to send requests, i see in the
jconsole that the attributes on the Queue
DequeueCount
EnqueueCount
DispatchCount
are the same.
The queue size stays at 0-1

But when, i create a active mq connection for responses (instead of temp
queues), i get totally differnet numbers.
For example:

DequeueCount = 0
EnqueueCount = 467
DispatchCount = 31

and the queue size increases forever. and it is equal to EnqueueCount .

Does any one have any idea, what i am doing wrong?

Here is how i initalize the resources:
private synchronized void initMassagingConnections() /*throws
EVFailedProcessException*/
	{
		try
		{
			if (pcf == null && queueConnectionFactory == null)
			{
				pcf = new PooledConnectionFactory();
				queueConnectionFactory = new
ActiveMQConnectionFactory(QUEUE_CONNECION_FACTORY);
				queueConnectionFactory.setCopyMessageOnSend(false);
//				queueConnectionFactory.setProducerWindowSize(1024000);
				pcf.setConnectionFactory(queueConnectionFactory);
//				pcf.setMaxConnections(5);
//				pcf.setMaximumActive(5);
				
			}
			
			if(queueConnection == null)
			{
				queueConnection = (QueueConnection)pcf.createConnection();
				
				//queueConnection = queueConnectionFactory.createQueueConnection();
				queueConnection.start(); //Starts (or restarts) a connection's delivery
of incoming messages. A call to start on a connection that has already been
started is ignored.		
			}
			
			if(queueSessionSend == null)
			{
				/*
				 * If the session is transacted, the first parameter to
createQueueSession is true, 
				 * and the acknowledge mode parameter is ignored. If the session is not
transacted, 
				 * the first parameter to createQueueSession is false, and the
acknowledge mode parameter must be set.
				 */
				queueSessionSend = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
			}			
			if(queueSessionReceive == null)
			{
				queueSessionReceive = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
			}
			if(destinationSendClient == null)
			{
				destinationSendClient = queueSessionSend.createQueue(USER_QUEUE_NAME);
			}
			if(destinationSendAgent == null)
			{
				destinationSendAgent = queueSessionSend.createQueue(AGENT_QUEUE_NAME);
			}			
			if (queueSender == null)
			{
				queueSender = queueSessionSend.createSender(null);
			}
			if (destinationReceive == null)
			{
				destinationReceive =  queueSessionReceive.createQueue("result_queue");
			}
		}
		catch(JMSException e)
		{
			logger.error(e , e , EVError.JMS_CONNECTION_EXCEPTION);
			//e.printStackTrace();
		}	


public void operate(final LogicRequest logicRequest)
	{
//        QueueSender queueSender = null;
	
debugUtils.writeln("**************************************************************");
        try 
        {
        	final MessageConsumer messageConsumer; 	
        	final MessageConsumerObserver consumerObserver = new
MessageConsumerObserver();
//        	consumerObserver.setObserver((DefaultObserver)observer);
        	  	
//        	final TemporaryQueue destinationReceive =
queueSessionReceive.createTemporaryQueue();
        
         	messageConsumer =
queueSessionReceive.createConsumer(destinationReceive);
         	debugUtils.writeln("Producer: send request - " +
logicRequest.getLogicPath()+ ", temp queue for response: "+
destinationReceive.toString());
         	
         	
         	//System.out.println("CONSUMER ID: " +
messageConsumer.toString());
        	consumerObserver.setConsumer(messageConsumer , destinationReceive);
        		
        	messageConsumer.setMessageListener(new MessageListener()
        	{
        		public void onMessage(Message message)
        		{
        			try
        			{
        				message.acknowledge();
	        			Object logicResponse = ((ObjectMessage)message).getObject();
	         			
	        			if(logicResponse instanceof LogicResponse)
	        			{       				
	        				consumerObserver.update((LogicResponse)logicResponse);
	        			}
	        			else 
	        			{
	        				throw new JMSException("Fetched message from Temporary queue
which is not from type LogicResponse.");
	        			}
	        			
        			}
        			catch(Exception e)
        			{
        				logger.error(e, EVError.JMS_GET_MESSAGE);
        				//e.printStackTrace();
        			}  
        		}
        		
        	});
        	
        	
        	ObjectMessage objMsg =
queueSessionSend.createObjectMessage(logicRequest);
        	objMsg.setJMSReplyTo(destinationReceive);
        	
        	queueSender.send((Queue)destinationSendAgent, objMsg);
        	
        	
        	objMsg = null;
//        	queueSender.close();
         
        } 
        catch (JMSException e) 
        {
        	System.out.println(e.getLocalizedMessage());
        	System.out.println(e.toString());
           // e.printStackTrace();
        } 
        
	}
		 
-- 
View this message in context: http://www.nabble.com/Why-does-dequeue-different-from-enqueue-and-from-dispatch-and-queue-size-increases-forever-tp16144292s2354p16144292.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message