Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6B7F77EF7 for ; Tue, 29 Nov 2011 21:34:53 +0000 (UTC) Received: (qmail 17453 invoked by uid 500); 29 Nov 2011 21:34:53 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 17413 invoked by uid 500); 29 Nov 2011 21:34:53 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 17406 invoked by uid 99); 29 Nov 2011 21:34:53 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Nov 2011 21:34:53 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Nov 2011 21:34:49 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id D7BC923889D7 for ; Tue, 29 Nov 2011 21:34:27 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1208088 - in /camel/branches/camel-2.7.x/components/camel-sql/src: main/java/org/apache/camel/component/sql/SqlEndpoint.java main/java/org/apache/camel/component/sql/SqlProducer.java test/java/org/apache/camel/component/sql/SqlRouteTest.java Date: Tue, 29 Nov 2011 21:34:27 -0000 To: commits@camel.apache.org From: cmueller@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111129213427.D7BC923889D7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cmueller Date: Tue Nov 29 21:34:26 2011 New Revision: 1208088 URL: http://svn.apache.org/viewvc?rev=1208088&view=rev Log: CAMEL-4662: add batching support to sql component Thank you Daniel Gredler for the patch Modified: camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java camel/branches/camel-2.7.x/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java Modified: camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java?rev=1208088&r1=1208087&r2=1208088&view=diff ============================================================================== --- camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java (original) +++ camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java Tue Nov 29 21:34:26 2011 @@ -31,6 +31,7 @@ import org.springframework.jdbc.core.Jdb public class SqlEndpoint extends DefaultEndpoint { private JdbcTemplate jdbcTemplate; private String query; + private boolean batch; public SqlEndpoint() { } @@ -46,7 +47,7 @@ public class SqlEndpoint extends Default } public Producer createProducer() throws Exception { - return new SqlProducer(this, query, jdbcTemplate); + return new SqlProducer(this, query, jdbcTemplate, batch); } public boolean isSingleton() { @@ -69,6 +70,14 @@ public class SqlEndpoint extends Default this.query = query; } + public boolean isBatch() { + return batch; + } + + public void setBatch(boolean batch) { + this.batch = batch; + } + @Override protected String createEndpointUri() { return "sql:" + query; Modified: camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java?rev=1208088&r1=1208087&r2=1208088&view=diff ============================================================================== --- camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java (original) +++ camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java Tue Nov 29 21:34:26 2011 @@ -20,10 +20,10 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; -import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.ColumnMapRowMapper; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.PreparedStatementCallback; @@ -32,44 +32,56 @@ import org.springframework.jdbc.core.Row public class SqlProducer extends DefaultProducer { private String query; private JdbcTemplate jdbcTemplate; + private boolean batch; - public SqlProducer(SqlEndpoint endpoint, String query, JdbcTemplate jdbcTemplate) { + public SqlProducer(SqlEndpoint endpoint, String query, JdbcTemplate jdbcTemplate, boolean batch) { super(endpoint); this.jdbcTemplate = jdbcTemplate; this.query = query; + this.batch = batch; } - @SuppressWarnings("unchecked") public void process(final Exchange exchange) throws Exception { - jdbcTemplate.execute(query, new PreparedStatementCallback() { - public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException { - int argNumber = 1; - - // number of parameters must match + jdbcTemplate.execute(query, new PreparedStatementCallback>() { + public Map doInPreparedStatement(PreparedStatement ps) throws SQLException { int expected = ps.getParameterMetaData().getParameterCount(); - if (expected > 0 && exchange.getIn().getBody() != null) { + // transfer incoming message body data to prepared statement parameters, if necessary + if (exchange.getIn().getBody() != null) { Iterator iterator = exchange.getIn().getBody(Iterator.class); - while (iterator != null && iterator.hasNext()) { - ps.setObject(argNumber++, iterator.next()); + + if (batch) { + while (iterator != null && iterator.hasNext()) { + Object value = iterator.next(); + Iterator i = exchange.getContext().getTypeConverter().convertTo(Iterator.class, value); + populateStatement(ps, i, expected); + ps.addBatch(); + } + } else { + populateStatement(ps, iterator, expected); } } - if (argNumber - 1 != expected) { - throw new SQLException("Number of parameters mismatch. Expected: " + expected + ", was:" + (argNumber - 1)); - } - - boolean isResultSet = ps.execute(); - - if (isResultSet) { - RowMapperResultSetExtractor mapper = new RowMapperResultSetExtractor(new ColumnMapRowMapper()); - List result = (List) mapper.extractData(ps.getResultSet()); - exchange.getOut().setBody(result); - exchange.getIn().setHeader(SqlConstants.SQL_ROW_COUNT, result.size()); - // preserve headers - exchange.getOut().setHeaders(exchange.getIn().getHeaders()); + // execute the prepared statement and populate the outgoing message + if (batch) { + int[] updateCounts = ps.executeBatch(); + int total = 0; + for (int count : updateCounts) { + total += count; + } + exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, total); } else { - exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount()); + boolean isResultSet = ps.execute(); + if (isResultSet) { + RowMapperResultSetExtractor> mapper = new RowMapperResultSetExtractor>(new ColumnMapRowMapper()); + List> result = mapper.extractData(ps.getResultSet()); + exchange.getOut().setBody(result); + exchange.getIn().setHeader(SqlConstants.SQL_ROW_COUNT, result.size()); + // preserve headers + exchange.getOut().setHeaders(exchange.getIn().getHeaders()); + } else { + exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount()); + } } // data is set on exchange so return null @@ -78,4 +90,19 @@ public class SqlProducer extends Default }); } + private void populateStatement(PreparedStatement ps, Iterator iterator, int expectedParams) throws SQLException { + int argNumber = 1; + if (expectedParams > 0) { + while (iterator != null && iterator.hasNext()) { + Object value = iterator.next(); + log.trace("Setting parameter #{} with value: {}", argNumber, value); + ps.setObject(argNumber, value); + argNumber++; + } + } + + if (argNumber - 1 != expectedParams) { + throw new SQLException("Number of parameters mismatch. Expected: " + expectedParams + ", was:" + (argNumber - 1)); + } + } } Modified: camel/branches/camel-2.7.x/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java?rev=1208088&r1=1208087&r2=1208088&view=diff ============================================================================== --- camel/branches/camel-2.7.x/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java (original) +++ camel/branches/camel-2.7.x/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java Tue Nov 29 21:34:26 2011 @@ -17,6 +17,7 @@ package org.apache.camel.component.sql; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -31,6 +32,7 @@ import org.junit.Before; import org.junit.Test; import org.springframework.dao.DataAccessException; import org.springframework.dao.EmptyResultDataAccessException; +import org.springframework.jdbc.UncategorizedSQLException; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.datasource.SingleConnectionDataSource; @@ -176,33 +178,78 @@ public class SqlRouteTest extends CamelT assertEquals("Camel", row.get("PROJECT")); } + @Test + @SuppressWarnings("unchecked") + public void testBatch() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + List data = Arrays.asList(Arrays.asList(6, "abc", "def"), Arrays.asList(7, "ghi", "jkl"), Arrays.asList(8, "mno", "pqr")); + template.sendBody("direct:batch", data); + mock.assertIsSatisfied(); + Number received = assertIsInstanceOf(Number.class, mock.getReceivedExchanges().get(0).getIn().getHeader(SqlConstants.SQL_UPDATE_COUNT)); + assertEquals(3, received.intValue()); + assertEquals("abc", jdbcTemplate.queryForObject("select project from projects where id = 6", String.class)); + assertEquals("def", jdbcTemplate.queryForObject("select license from projects where id = 6", String.class)); + assertEquals("ghi", jdbcTemplate.queryForObject("select project from projects where id = 7", String.class)); + assertEquals("jkl", jdbcTemplate.queryForObject("select license from projects where id = 7", String.class)); + assertEquals("mno", jdbcTemplate.queryForObject("select project from projects where id = 8", String.class)); + assertEquals("pqr", jdbcTemplate.queryForObject("select license from projects where id = 8", String.class)); + } + + @Test + @SuppressWarnings("unchecked") + public void testBatchMissingParamAtEnd() throws Exception { + try { + List data = Arrays.asList(Arrays.asList(9, "stu", "vwx"), Arrays.asList(10, "yza")); + template.sendBody("direct:batch", data); + fail(); + } catch (RuntimeCamelException e) { + assertTrue(e.getCause() instanceof UncategorizedSQLException); + } + assertEquals(0, jdbcTemplate.queryForInt("select count(*) from projects where id = 9")); + assertEquals(0, jdbcTemplate.queryForInt("select count(*) from projects where id = 10")); + } + + @Test + @SuppressWarnings("unchecked") + public void testBatchMissingParamAtBeginning() throws Exception { + try { + List data = Arrays.asList(Arrays.asList(9, "stu"), Arrays.asList(10, "vwx", "yza")); + template.sendBody("direct:batch", data); + fail(); + } catch (RuntimeCamelException e) { + assertTrue(e.getCause() instanceof UncategorizedSQLException); + } + assertEquals(0, jdbcTemplate.queryForInt("select count(*) from projects where id = 9")); + assertEquals(0, jdbcTemplate.queryForInt("select count(*) from projects where id = 10")); + } @Before public void setUp() throws Exception { Class.forName(driverClass); - super.setUp(); - + ds = new SingleConnectionDataSource(url, user, password, true); + jdbcTemplate = new JdbcTemplate(ds); - jdbcTemplate.execute("create table projects (id integer primary key," - + "project varchar(10), license varchar(5))"); + jdbcTemplate.execute("create table projects (id integer primary key, project varchar(10), license varchar(5))"); jdbcTemplate.execute("insert into projects values (1, 'Camel', 'ASF')"); jdbcTemplate.execute("insert into projects values (2, 'AMQ', 'ASF')"); jdbcTemplate.execute("insert into projects values (3, 'Linux', 'XXX')"); + + super.setUp(); } @After public void tearDown() throws Exception { super.tearDown(); - JdbcTemplate jdbcTemplate = new JdbcTemplate(ds); + jdbcTemplate.execute("drop table projects"); + ((SingleConnectionDataSource) ds).destroy(); } @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() { - ds = new SingleConnectionDataSource(url, user, password, true); - getContext().getComponent("sql", SqlComponent.class).setDataSource(ds); errorHandler(noErrorHandler()); @@ -223,6 +270,10 @@ public class SqlRouteTest extends CamelT from("direct:no-param").to("sql:select * from projects order by id").to("mock:result"); from("direct:no-param-insert").to("sql:insert into projects values (5, '#', param)?placeholder=param").to("mock:result"); + + from("direct:batch") + .to("sql:insert into projects values (#, #, #)?batch=true") + .to("mock:result"); } }; }