activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sonicBasher <sonicBas...@gmail.com>
Subject Re: ActiveMQ mixes up order of messages while receiving them from Camel
Date Thu, 01 Jul 2010 11:31:22 GMT

Hi Clark,

I solved my problem using transactions. Here's how I did it, maybe it'll
help someone who stumbles upon similar problem (you should probably create
the ActiveMQConnectionFactory and Session stuff elswhere and just pass a
reference, but I add it here to make things more clear):

import java.util.List;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.Body;
import org.apache.camel.Handler;
import com.thoughtworks.xstream.XStream;

@Handler
    public void produce(@Body List listOfMyObj) throws JMSException {
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
        Connection connection = cf.createConnection();
        connection.start();
        XStream x = new XStream();
        List<MyObject> listOfMyObjects = (List<MyObject>) listOfMyObj;
        Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
        String id = listOfMyObjects.get(0).getAccountName();
        Queue queue = session.createQueue(id);
        javax.jms.MessageProducer producer = session.createProducer(queue);
        for (int q = 0; q < listOfMyObjects .size(); q++) {
            MyObject eo = listOfMyObjects .get(q);
            Message message = session.createTextMessage(x.toXML(eo));
            producer.send(message);
            if (q == listOfMyObjects .size() - 1) {
                session.commit();
            }
        }
    }



regards





cobrien wrote:
> 
> Hi,
> It appears like you are sending messages in async  mode (see link: 
> http://activemq.apache.org/async-sends.html). Order cannot be guaranteed,
> even for a single producer, using async mode. So the answer to your
> question is  yes the sync/async mode can change ordering particularly when 
> sent over  slow or unreliable networks. 
> 
> You also may want to look into using transactions to batch  message sends.
> The link below has an example Camel configuration.( under connection
> pooling)
> 
> http://camel.apache.org/activemq.html
> 
> 
> Clark 
> 
> www.ttmsolutions.com 
> ActiveMQ reference guide at 
> http://bit.ly/AMQRefGuide
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>  
> 
> 
> 
> 
> 
> 
> sonicBasher wrote:
>> 
>> Hi Clark,
>> 
>> I'm using Client-Server topology, meaning I just access ActiveMQ via tcp.
>> I do it like this:
>> 
>> final String brokerURL = "tcp://localhost:61616";
>> ConnectionFactory connectionFactory = new
>> ActiveMQConnectionFactory(brokerURL);
>> context.addComponent("jms",
>> JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
>> 
>> where context is obviously my DefaultCamelContext object.
>> 
>> As for the config files, I haven't changed anything. My activemq.xml
>> looks like this:
>> 
>> 
>> 
>> <!--
>>     Licensed to the Apache Software Foundation (ASF) under one or more
>>     contributor license agreements.  See the NOTICE file distributed with
>>     this work for additional information regarding copyright ownership.
>>     The ASF licenses this file to You under the Apache License, Version
>> 2.0
>>     (the "License"); you may not use this file except in compliance with
>>     the License.  You may obtain a copy of the License at
>>    
>>     http://www.apache.org/licenses/LICENSE-2.0
>>    
>>     Unless required by applicable law or agreed to in writing, software
>>     distributed under the License is distributed on an "AS IS" BASIS,
>>     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>>     See the License for the specific language governing permissions and
>>     limitations under the License.
>> -->
>> <beans
>>   xmlns="http://www.springframework.org/schema/beans"
>>   xmlns:amq="http://activemq.apache.org/schema/core"
>>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>   xsi:schemaLocation="http://www.springframework.org/schema/beans
>> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>>   http://activemq.apache.org/schema/core
>> http://activemq.apache.org/schema/core/activemq-core.xsd">
>> 
>>     <!-- Allows us to use system properties as variables in this
>> configuration file -->
>>     <bean
>> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>>         <property name="locations">
>>            
>> <value>file:${activemq.base}/conf/credentials.properties</value>
>>         </property>      
>>     </bean>
>> 
>>     <!-- 
>>         The <broker> element is used to configure the ActiveMQ broker. 
>>     -->
>>     <broker xmlns="http://activemq.apache.org/schema/core"
>> brokerName="localhost" dataDirectory="${activemq.base}/data"
>> destroyApplicationContextOnStop="true">
>>  
>>         <!--
>> 			For better performances use VM cursor and small memory limit.
>> 			For more information, see:
>>             
>>             http://activemq.apache.org/message-cursors.html
>>             
>>             Also, if your producer is "hanging", it's probably due to
>> producer flow control.
>>             For more information, see:
>>             http://activemq.apache.org/producer-flow-control.html
>>         -->
>>               
>>         <destinationPolicy>
>>             <policyMap>
>>               <policyEntries>
>>                 <policyEntry topic=">" producerFlowControl="true"
>> memoryLimit="1mb">
>>                   <pendingSubscriberPolicy>
>>                     <vmCursor />
>>                   </pendingSubscriberPolicy>
>>                 </policyEntry>
>>                 <policyEntry queue=">" producerFlowControl="true"
>> memoryLimit="1mb">
>>                   <!-- Use VM cursor for better latency
>>                        For more information, see:
>>                        
>>                        http://activemq.apache.org/message-cursors.html
>>                        
>>                   <pendingQueuePolicy>
>>                     <vmQueueCursor/>
>>                   </pendingQueuePolicy>
>>                   -->
>>                 </policyEntry>
>>               </policyEntries>
>>             </policyMap>
>>         </destinationPolicy> 
>>  
>>         
>>         <!-- 
>>             The managementContext is used to configure how ActiveMQ is
>> exposed in 
>>             JMX. By default, ActiveMQ uses the MBean server that is
>> started by 
>>             the JVM. For more information, see: 
>>             
>>             http://activemq.apache.org/jmx.html 
>>         -->
>>         <managementContext>
>>             <managementContext createConnector="false"/>
>>         </managementContext>
>> 
>>         <!-- 
>>             Configure message persistence for the broker. The default
>> persistence
>>             mechanism is the KahaDB store (identified by the kahaDB tag). 
>>             For more information, see: 
>>             
>>             http://activemq.apache.org/persistence.html 
>>         -->
>>         <persistenceAdapter>
>>             <kahaDB directory="${activemq.base}/data/kahadb"/>
>>         </persistenceAdapter>
>>         
>>         
>>           <!--
>>             The systemUsage controls the maximum amount of space the
>> broker will 
>>             use before slowing down producers. For more information, see:
>>             
>>             http://activemq.apache.org/producer-flow-control.html
>>              
>>         <systemUsage>
>>             <systemUsage>
>>                 <memoryUsage>
>>                     <memoryUsage limit="20 mb"/>
>>                 </memoryUsage>
>>                 <storeUsage>
>>                     <storeUsage limit="1 gb" name="foo"/>
>>                 </storeUsage>
>>                 <tempUsage>
>>                     <tempUsage limit="100 mb"/>
>>                 </tempUsage>
>>             </systemUsage>
>>         </systemUsage>
>> 		-->
>> 		  
>>         <!-- 
>>             The transport connectors expose ActiveMQ over a given
>> protocol to
>>             clients and other brokers. For more information, see: 
>>             
>>             http://activemq.apache.org/configuring-transports.html 
>>         -->
>>         <transportConnectors>
>>             <transportConnector name="openwire"
>> uri="tcp://0.0.0.0:61616"/>
>>         </transportConnectors>
>> 
>>     </broker>
>> 
>>     <!-- 
>>         Uncomment to enable Camel
>>         Take a look at activemq-camel.xml for more details
>>          
>>     <import resource="camel.xml"/>
>>     -->
>> 
>>     <!-- 
>>         Enable web consoles, REST and Ajax APIs and demos
>>         Take a look at activemq-jetty.xml for more details 
>>     -->
>>     <import resource="jetty.xml"/>
>>     
>> </beans>
>> 
>> 
>> 
>> I tried to enable camel in activemq.xml by uncommenting the  <import
>> resource="camel.xml"/> line, but it doesn't seem to help either - still
>> getting mixed up messages.
>> My camel.xml looks like that:
>> 
>> 
>> 
>> <!--
>>     Licensed to the Apache Software Foundation (ASF) under one or more
>>     contributor license agreements.  See the NOTICE file distributed with
>>     this work for additional information regarding copyright ownership.
>>     The ASF licenses this file to You under the Apache License, Version
>> 2.0
>>     (the "License"); you may not use this file except in compliance with
>>     the License.  You may obtain a copy of the License at
>>    
>>     http://www.apache.org/licenses/LICENSE-2.0
>>    
>>     Unless required by applicable law or agreed to in writing, software
>>     distributed under the License is distributed on an "AS IS" BASIS,
>>     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>>     See the License for the specific language governing permissions and
>>     limitations under the License.
>> -->
>> <!-- 
>> 
>>     Lets deploy some Enterprise Integration Patterns inside the ActiveMQ
>> Message Broker
>>     For more information, see:
>>     
>>     http://camel.apache.org
>>     
>>     Include this file in your configuration to enable Camel
>>     
>>     e.g. <import resource="camel.xml"/>
>> -->
>> <beans
>>    xmlns="http://www.springframework.org/schema/beans"  
>>    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>    xsi:schemaLocation="http://camel.apache.org/schema/spring
>> http://camel.apache.org/schema/spring/camel-spring.xsd
>>    http://www.springframework.org/schema/beans
>> http://www.springframework.org/schema/beans/spring-beans.xsd">
>>   
>>   	<camelContext id="camel"
>> xmlns="http://camel.apache.org/schema/spring">
>> 
>>         <!-- You can use a <packages> element for each root package to
>> search for Java routes -->
>>         <packageScan>
>>            <package>org.foo.bar</package>
>>         </packageScan>
>> 
>>         <!-- You can use Spring XML syntax to define the routes here
>> using the <route> element -->
>>         <route>
>>             <from uri="activemq:example.A"/>
>>             <to uri="activemq:example.B"/>
>>         </route>
>>     </camelContext>
>> 
>>     <!--
>>        Lets configure some Camel endpoints
>>     
>>        http://camel.apache.org/components.html
>>     -->
>> 
>>     <!-- configure the camel activemq component to use the current broker
>> -->
>>     <bean id="activemq"
>> class="org.apache.activemq.camel.component.ActiveMQComponent" >
>>        <property name="userName" value="${activemq.username}"/>
>>        <property name="password" value="${activemq.password}"/>
>>     </bean>
>> </beans>
>> 
>> 
>> 
>> I'm not sure if any other config files might have impact on what is going
>> on in my case. If yes, please do let me know.
>> 
>> regards
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> cobrien wrote:
>>> 
>>> Hi,
>>> Can you describe a little more your Queue topology and configuration-
>>> perhaps even share your config file(s). Not all queue topologies are
>>> order preserving!
>>> -clark
>>> 
>>> Clark 
>>> 
>>> www.ttmsolutions.com 
>>> ActiveMQ reference guide at 
>>> http://bit.ly/AMQRefGuide
>>> 
>>> 
>>> sonicBasher wrote:
>>>> 
>>>> At the end of my camel route I have a following class which I use to
>>>> put messages in ActiveMQ: 
>>>> 
>>>> public class MessageProducer { 
>>>> 
>>>>     private ProducerTemplate template = null; 
>>>> 
>>>>     public MessageProducer(ProducerTemplate template) { 
>>>>         this.template = template; 
>>>>     } 
>>>> 
>>>>     public void produce(@Body List listOfEttexObj) { 
>>>>         System.out.println("Entered produce() method"); 
>>>>         XStream x = new XStream(); 
>>>>         List<EttexObject> listOfEttexObjects =
>>>> (List<EttexObject>)listOfEttexObj; 
>>>>         for (int q = 0; q < listOfEttexObjects.size(); q++) { 
>>>>             EttexObject eo = listOfEttexObjects.get(q); 
>>>>             template.sendBody("jms:" + eo.getAccountName() +
>>>> "_2",x.toXML(eo)); 
>>>>             System.out.println("Sent type: " +
>>>> eo.getMessage().getType()); 
>>>>         } 
>>>>     } 
>>>> } 
>>>> 
>>>> 
>>>> *XStream is just a library for converting objects to XML, has nothing
>>>> to do with anything
>>>> **EttexObject is my POJO class 
>>>> 
>>>> Now, I noticed that whenever I send objects (from the list passed as an
>>>> argument) to the queues, which I implicitly create using
>>>> ProducerTemplate, most of the time the order of delivered messages is
>>>> correct, but there's always one queue, which gets the mixed up
>>>> messages. I can't really figure out why. I tried to use the requestBody
>>>> method, instead of sendBody, to no avail. I also double checked the
>>>> lists that come in - the order of objects inside them is fine. Honestly
>>>> I'm kind of stuck here. Will be grateful for any tips. 
>>>> 
>>>> regards 
>>>> 
>>>> p.s. Can synchronous/asynchronous ActiveMQ mode have anything to do
>>>> with my problem? 
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 

-- 
View this message in context: http://old.nabble.com/ActiveMQ-mixes-up-order-of-messages-while-receiving-them-from-Camel-tp29008581p29044634.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message