activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gary Tully <gary.tu...@gmail.com>
Subject Re: Multiple consumers not consuming
Date Wed, 06 Nov 2013 15:52:28 GMT
if the default session is auto ack, then with prefetch=1, a second
message will be dispatched as soon as the first is acked, which will
be before the listener is invoked.
Using client ack mode will avoid this dispatch till the ack is received.
An alternative is the use receive(timeout) in a loop and prefetch=0

On 19 April 2013 23:44, SledgeHammer <grosin@firstam.com> wrote:
> Here is my ActiveMQ.xml:
>
>
>
> <beans
>   xmlns="http://www.springframework.org/schema/beans"
>   xmlns:amq="http://activemq.apache.org/schema/core"
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>   xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd
>   http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd">
>
>
>     <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>         <property name="locations">
>             <value>file:${activemq.conf}/credentials.properties</value>
>         </property>
>     </bean>
>
>
>     <broker xmlns="http://activemq.apache.org/schema/core"
> brokerName="localhost" dataDirectory="${activemq.data}">
>
>
>
>         <destinationPolicy>
>             <policyMap>
>               <policyEntries>
>                 <policyEntry topic=">" producerFlowControl="true">
>
>                   <pendingMessageLimitStrategy>
>                     <constantPendingMessageLimitStrategy limit="1000"/>
>                   </pendingMessageLimitStrategy>
>                 </policyEntry>
>                 <policyEntry queue=">" producerFlowControl="false"
> memoryLimit="1mb">
>
>                 </policyEntry>
>               </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>
>
>
>         <managementContext>
>             <managementContext createConnector="false"/>
>         </managementContext>
>
>
>         <persistenceAdapter>
>             <kahaDB directory="${activemq.data}/kahadb"/>
>         </persistenceAdapter>
>
>
>
>           <systemUsage>
>             <systemUsage>
>                 <memoryUsage>
>                     <memoryUsage limit="64 mb"/>
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="100 gb"/>
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="50 gb"/>
>                 </tempUsage>
>             </systemUsage>
>         </systemUsage>
>
>
>         <transportConnectors>
>
>             <transportConnector name="openwire"
> uri="tcp://0.0.0.0:25055?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
>             <transportConnector name="amqp"
> uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
>         </transportConnectors>
>
>
>         <shutdownHooks>
>             <bean xmlns="http://www.springframework.org/schema/beans"
> class="org.apache.activemq.hooks.SpringContextHook" />
>         </shutdownHooks>
>
>     </broker>
>
>
>     <import resource="jetty.xml"/>
>
> </beans>
>
>
> I have already included the producer and consumer uri flags I am using. I am
> using the async message listeners if that has anything to do with it.
>
> I have tried deleting and re-creating the Queue.
>
> My producer set up code looks like:
>
>                         Uri connectUri = new
> Uri(String.Format("activemq:tcp://{0}:{1}?wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=true&jms.useCompression=true",
> server, port));
>
>                         if ((object)connectUri == null)
>                                 throw new ArgumentNullException("connectUri");
>
>                         IConnectionFactory factory = new NMSConnectionFactory(connectUri);
>
>                         if ((object)factory == null)
>                                 throw new Exception("Unable to create connection factory.");
>
>                         _connection = factory.CreateConnection();
>
>                         if ((object)_connection == null)
>                                 throw new Exception("Unable to create connection.");
>
>                         _session = _connection.CreateSession();
>
>                         if ((object)_session == null)
>                                 throw new Exception("Unable to create session.");
>
>                         string[] arrQueues = Enum.GetNames(typeof(Queue));
>
>                         if (arrQueues.Length > 0)
>                         {
>                                 _destinations = new IDestination[arrQueues.Length];
>
>                                 for (int nIndex = 0; nIndex < arrQueues.Length; nIndex++)
>                                 {
>                                         _destinations[nIndex] = SessionUtil.GetDestination(_session,
> String.Format("queue://{0}", arrQueues[nIndex]));
>
>                                         if ((object)_destinations[nIndex] == null)
>                                                 throw new Exception("Unable to get destination.");
>                                 }
>
>                                 _producers = new IMessageProducer[arrQueues.Length];
>                         }
>
>                         _tempQueue = _session.CreateTemporaryQueue();
>
>                         if ((object)_tempQueue == null)
>                                 throw new Exception("Unable to create response queue.");
>
>                         _consumer = _session.CreateConsumer(_tempQueue);
>
>                         if ((object)_consumer == null)
>                                 throw new Exception("Unable to create consumer.");
>                         else
>                                 _consumer.Listener += consumer_Listener;
>
>                         _connection.Start();
>
>
> So its just newing up the standard objects and keeping them standard. I
> create a single temporary queue per producer to get responses from the
> clients.
>
> Creating a message is:
>
>                         Guid guid = Guid.NewGuid();
>
>                         if ((object)payload == null)
>                                 throw new ArgumentNullException("payload");
>
>                         IMessageProducer producer = _producers[(int)queue];
>
>                         if ((object)producer == null)
>                         {
>                                 _producers[(int)queue] =
> _session.CreateProducer(_destinations[(int)queue]);
>
>                                 if ((object)_producers[(int)queue] == null)
>                                         throw new Exception("Unable to create producer.");
>
>                                 producer = _producers[(int)queue];
>                         }
>
>                         IBytesMessage message =
> _session.CreateBytesMessage(Util.ObjectToByteArray(payload));
>
>                         message.NMSCorrelationID = guid.ToString();
>                         message.NMSReplyTo = _tempQueue;
>
>                         return new ActiveMQMessage(null, producer, null, message);
>
>
> The consumer set up is:
>
>                         Uri connectUri = new
> Uri(String.Format("activemq:tcp://{0}:{1}?wireFormat.tightEncodingEnabled=true&nms.PrefetchPolicy.QueuePrefetch=1",
> _strServer, _nPort));
>
>                         if ((object)connectUri == null)
>                                 throw new ArgumentNullException("connectUri");
>
>                         IConnectionFactory factory = new NMSConnectionFactory(connectUri);
>
>                         using (IConnection connection = factory.CreateConnection())
>                         {
>                                 using (ISession session = connection.CreateSession())
>                                 {
>                                         IDestination destination = SessionUtil.GetDestination(session,
> String.Format("queue://{0}", queue.ToString()));
>
>                                         using (IMessageConsumer consumer = session.CreateConsumer(destination))
>                                         {
>                                                 MessageListener handler = (x) => {
consumer_Listener(session, x); };
>                                                 consumer.Listener += handler;
>                                                 connection.Start();
>                                                 cancellationToken.WaitHandle.WaitOne();
>                                                 consumer.Listener -= handler;
>                                         }
>                                 }
>                         }
>
> Let me know if you want to see any additional code :)
>
>
>
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/Multiple-consumers-not-consuming-tp4666078p4666087.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.



-- 
http://redhat.com
http://blog.garytully.com

Mime
View raw message