camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rjsteixeira <rica...@ctrler.eu>
Subject Camel gradually slowing down with large sql results
Date Mon, 14 Oct 2013 17:39:20 GMT
Hi.

I'm seeing a constant slowdown in Camel. It starts at 600 inserts/s and
never ends because the rate drops to 5 or 6 inserts a second...


Here is a graph with part of the slowdown (as seen from RabbitMQ):
http://i.imgur.com/QAS6nwg.png
In this graph you can see the end of one run (I killed it) and the start of
another, the difference is staggering: http://i.imgur.com/dhd9GG1.png

I'm consuming from large database table with a simple query that outputs
about 400 000 lines that I then insert into a RabbitMQ queue.

The insert rate starts really high and then start to fall gradually.
I've tried running camel without inserting in the queue and there is also a
slowdown in processing.

The CPU is always maxed out with java...

>From the same machine can consume from the queue with storm at about 3000
lines/sec. Without slowdowns and with unmarshaling with XStream (8000
lines/s without unmarshaling).

Can anyone point me in the right direction to solve this?

Thanks.

#########################################################
Here is my Spring file.

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="
       http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://camel.apache.org/schema/spring 
       http://camel.apache.org/schema/spring/camel-spring.xsd">

	
	<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource"
		destroy-method="close">
		<property name="driverClassName" value="oracle.jdbc.OracleDriver" />
		<property name="url" value="jdbc:oracle:thin:@localhost:1521:xe" />
		<property name="username" value="sys as sysdba" />
		<property name="password" value="xxxxx" />
	</bean>

	
	<bean id="sql" class="org.apache.camel.component.sql.SqlComponent">
		<property name="dataSource" ref="dataSource" />
	</bean>

	
	<camelContext streamCache="true"
xmlns="http://camel.apache.org/schema/spring">
	
	
		
		<propertyPlaceholder id="placeholder" location="classpath:sql.properties"
/>

		
		<dataFormats>
			<xstream id="xstream-utf8" encoding="UTF-8" />
			<xstream id="xstream-default" />
		</dataFormats>
	
		
		<route id="databaseRoute">
			<from
uri="quartz2://myGroup/myTimerName?trigger.repeatInterval=20000&amp;trigger.repeatCount=0"
/>
 			<to uri="sql:{{sql.selectOrderIdMax}}" />
			<split streaming="true" parallelProcessing="true">
				<simple>${body}</simple>
				<to uri="direct:sqlqueue" />
			</split>
		</route> 

 		<route id="processOrder-route" >
			<from uri="direct:sqlqueue" />
			<process ref="lineProcessor" />
			<marshal ref="xstream-utf8" />
			<to
uri="rabbitmq://xx.x.x.x:5672/myExchange?username=xxx&amp;password=xxx" />
		</route>
		

	</camelContext>

	
	<bean id="lineProcessor"
class="eu.rteixeira.dis.cameldbtoqueue.LineProcessor" />

</beans>

#########################################################
The SQL is:
sql.selectOrderIdMax=select * from sales where integration_id = (select
max(integration_id) as max_int from sales)

#########################################################
The LineProcessor is just constructs an object for each line of the results
to the pass to the serializer. The slowdown happened without it.

/**
 * 
 */
package eu.rteixeira.dis.cameldbtoqueue;

import eu.rteixeira.dis.objmodel.LineBean;

import java.util.Map;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;

public class LineProcessor implements Processor {
    
	public void process(Exchange exchange) throws Exception {
    	
    	Map<String, ?> row = (Map<String,?>) exchange.getIn().getBody();
    	
    	LineBean lb = new LineBean(row);
    	
    	
    	exchange.getIn().setBody(lb);
    }
}

#########################################################
The LineBean

package eu.rteixeira.dis.objmodel;

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.Map;

public class LineBean {
	
	Integer store;
	Integer sku;
	String ean;
	BigDecimal qty;
	BigDecimal pvp; 
	Timestamp sales_date; 
	Timestamp vdate;
	Integer physical_store;
	Integer integration_id;
	Integer group_no;
	String tran_type;
	BigDecimal sales_value;
	BigDecimal discount_qty;
	BigDecimal discount_value;
	BigDecimal discount_voucher_qty;
	BigDecimal discount_voucher_value;
	BigDecimal base_value;
	
	
	
	public LineBean() {
		super();
	}

	public LineBean(Map<String, ?> row){
		
		store = ((row.get("store") == null) ? null :
Integer.parseInt(row.get("STORE").toString()));
		sku = ((row.get("sku") == null) ? null :
Integer.parseInt(row.get("sku").toString()));
		ean = ((row.get("ean") == null) ? null : row.get("ean").toString());
		qty = ((row.get("qty")==null) ? null : new
BigDecimal(row.get("qty").toString()));
		pvp = ((row.get("pvp") == null) ? null : new
BigDecimal(row.get("pvp").toString()));
		sales_date = ((row.get("sales_date") == null) ? null :
Timestamp.valueOf(row.get("sales_date").toString()));
		vdate = ((row.get("vdate") == null) ? null :
Timestamp.valueOf(row.get("vdate").toString()));
		physical_store = ((row.get("physical_store") == null) ? null :
Integer.parseInt(row.get("physical_store").toString()));
		integration_id = ((row.get("integration_id") == null) ? null :
Integer.parseInt(row.get("integration_id").toString()));
		group_no = ((row.get("group_no") == null) ? null :
Integer.parseInt(row.get("group_no").toString()));
		tran_type = ((row.get("tran_type") == null) ? null :
row.get("tran_type").toString());
		sales_value = ((row.get("sales_value") == null) ? null : new
BigDecimal(row.get("sales_value").toString()));
		discount_qty = ((row.get("discount_qty")==null) ? null : new
BigDecimal(row.get("discount_qty").toString()));
		discount_value = ((row.get("discount_value")==null) ? null : new
BigDecimal(row.get("discount_value").toString()));
		discount_voucher_qty = ((row.get("discount_voucher_qty")==null) ? null :
new BigDecimal(row.get("discount_voucher_qty").toString()));
		discount_voucher_value = ((row.get("discount_voucher_value")==null) ? null
: new BigDecimal(row.get("discount_voucher_value").toString()));
		base_value = ((row.get("base_value")==null) ? null : new
BigDecimal(row.get("base_value").toString()));
	
	}



--
View this message in context: http://camel.465427.n5.nabble.com/Camel-gradually-slowing-down-with-large-sql-results-tp5741560.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Mime
View raw message