camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aedwards <a...@middleware360.com>
Subject ProducerTemplate batch commits?
Date Tue, 04 Oct 2011 20:53:22 GMT
Hi all,

Using Camel 2.8.1

I am using a POJO producing bean to publish messages. 
http://camel.apache.org/pojo-producing.html     

This all works well.  However I occasionally have significant volume
(approximately 600k in my case).   Due to the high volume, I am doing some
performance testing.

In my performance testing, using jms batches significantly improves my
performance by almost 10x.   I am wondering if there is any way to send
multiple records in one transaction when using the camel producer template?  
Perhaps a "producerTemplate.sendBatch(List<Exchange> " ?

* I have considered using the asyncSend methods, but I really don't want to
use up multiple threads/resources.   Using batch commiting will make much
better use of my system resources.

** I also want to stick with a single message per record to avoid
excessively large payloads as well as allow the subscribers to filter
records based upon message selectors.


I have looked through the code, and can't seem to find any way to do this. 
Would appreciate any other suggestions.   Or if someone wanted to give me
some tips I could have a look at trying to help make any commits necessary
to add this functionality.   

Here is the test case I am using to validate performance:

package trans;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.SessionCallback;

public class PerformanceTest extends CamelTestSupport {
	String brokerUrl = "vm://localhost";
	String topicName = "test.incoming";
	String topicUri  = "jms:topic:test.incoming";
	String mockUri   = "mock:result";
	final String data      = "sample data message haaaaaaaaaaaaaaaaaaaa";
	int sendMessages = 5000;
	ActiveMQConnectionFactory connectionFactory;
	
    protected CamelContext createCamelContext() throws Exception {
        CamelContext camelContext = super.createCamelContext();
        ActiveMQComponent amq = new ActiveMQComponent();
        amq.setBrokerURL(brokerUrl+"?broker.persistent=true");
        camelContext.addComponent("jms", amq);
        
    	connectionFactory = new ActiveMQConnectionFactory();
    	connectionFactory.setBrokerURL(brokerUrl);
    	
        return camelContext;
    }
    
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            
            @Override
            public void configure() throws Exception {
                from(topicUri).id("route1").to(mockUri);              
            }
        };
    }

    @Test
    public void sendUsingCamelProducerTemplate() throws Exception {

    	this.getMockEndpoint(mockUri).expectedMessageCount(sendMessages);
    	long startTime = System.currentTimeMillis();
    	
    	for (int i=0;i&lt;sendMessages;i++) {
    		this.template().sendBody(this.topicUri, data);
    	}
    	
    	long finishTime = System.currentTimeMillis();
    	log.info(&quot;************************** Finish time: &quot; +
((finishTime - startTime)));
    	this.getMockEndpoint(this.mockUri).assertIsSatisfied();
    }

    
    @Test
    public void sendUsingJmsTemplateBatchCommit() throws Exception {

    	this.getMockEndpoint(this.mockUri).expectedMessageCount(sendMessages);
    	
    	JmsTemplate template = new JmsTemplate();
    	template.setConnectionFactory(connectionFactory);
    	template.setSessionTransacted(true);
    	template.setDeliveryMode(DeliveryMode.PERSISTENT);
    	template.setPubSubDomain(true);
    	
    	long startTime = System.currentTimeMillis();

		template.execute(new SessionCallback&lt;Object&gt;() {
			public Object doInJms(Session s) {
				try {
			    	Topic t = s.createTopic(topicName);
			    	MessageProducer messageProducer = s.createProducer(t);
			    	messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
			    	for (int i=0;i<sendMessages;i++) {
			    		Message m = s.createTextMessage(data);
			    		messageProducer.send(m);
			    	}
			    	s.commit();
				} catch (Exception e) {
					log.error("caught exception: ", e);
				}
			    	return null;
				}
			});
    	
    	long finishTime = System.currentTimeMillis();
    	log.info("************************** Finish time: " + ((finishTime -
startTime)));
    
    	this.getMockEndpoint(this.mockUri).assertIsSatisfied();
    	
    }

}


--
View this message in context: http://camel.465427.n5.nabble.com/ProducerTemplate-batch-commits-tp4870387p4870387.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Mime
View raw message