Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3756C1842E for ; Mon, 5 Oct 2015 09:50:32 +0000 (UTC) Received: (qmail 6352 invoked by uid 500); 5 Oct 2015 09:50:27 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 6194 invoked by uid 500); 5 Oct 2015 09:50:27 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 5981 invoked by uid 99); 5 Oct 2015 09:50:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Oct 2015 09:50:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 79F96E0286; Mon, 5 Oct 2015 09:50:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Mon, 05 Oct 2015 09:50:30 -0000 Message-Id: In-Reply-To: <9b0a007fe16a435b8e75fb89d53d995f@git.apache.org> References: <9b0a007fe16a435b8e75fb89d53d995f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/10] camel git commit: CAMEL-9162: camel-elsql component CAMEL-9162: camel-elsql component Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f65b0491 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f65b0491 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f65b0491 Branch: refs/heads/master Commit: f65b0491f25a94089b547937d46ef5638580ee7a Parents: 3dd8056 Author: Claus Ibsen Authored: Mon Oct 5 10:35:04 2015 +0200 Committer: Claus Ibsen Committed: Mon Oct 5 10:54:54 2015 +0200 ---------------------------------------------------------------------- .../camel/component/elsql/ElsqlComponent.java | 24 ++--- .../camel/component/elsql/ElsqlConsumer.java | 7 +- .../camel/component/elsql/ElsqlEndpoint.java | 16 ++- .../camel/component/elsql/ElsqlProducer.java | 5 +- .../component/elsql/ElsqlSqlMapSource.java | 49 ++++++--- .../elsql/ElsqlSqlProcessingStrategy.java | 51 ++++++++-- .../elsql/ElSqlConsumerDeleteTest.java | 100 +++++++++++++++++++ .../src/test/resources/elsql/projects.elsql | 4 + .../src/test/resources/log4j.properties | 2 +- .../apache/camel/component/sql/SqlConsumer.java | 48 +++++++-- .../sql/SqlNamedProcessingStrategy.java | 57 +++++++++++ 11 files changed, 307 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java index 40e6530..51142e8 100644 --- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java +++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java @@ -70,34 +70,23 @@ public class ElsqlComponent extends UriEndpointComponent { throw new IllegalArgumentException("Invalid uri. Must by elsql:elsqlName/resourceUri, was: " + uri); } - /* TODO: add this later String onConsume = getAndRemoveParameter(parameters, "consumer.onConsume", String.class); if (onConsume == null) { onConsume = getAndRemoveParameter(parameters, "onConsume", String.class); } - if (onConsume != null && isUsePlaceholder()) { - onConsume = onConsume.replaceAll(parameterPlaceholderSubstitute, "?"); - } String onConsumeFailed = getAndRemoveParameter(parameters, "consumer.onConsumeFailed", String.class); if (onConsumeFailed == null) { onConsumeFailed = getAndRemoveParameter(parameters, "onConsumeFailed", String.class); } - if (onConsumeFailed != null && isUsePlaceholder()) { - onConsumeFailed = onConsumeFailed.replaceAll(parameterPlaceholderSubstitute, "?"); - } String onConsumeBatchComplete = getAndRemoveParameter(parameters, "consumer.onConsumeBatchComplete", String.class); if (onConsumeBatchComplete == null) { onConsumeBatchComplete = getAndRemoveParameter(parameters, "onConsumeBatchComplete", String.class); } - if (onConsumeBatchComplete != null && isUsePlaceholder()) { - onConsumeBatchComplete = onConsumeBatchComplete.replaceAll(parameterPlaceholderSubstitute, "?"); - } - */ ElsqlEndpoint endpoint = new ElsqlEndpoint(uri, this, jdbcTemplate, elsqlName, resUri); -// endpoint.setOnConsume(onConsume); -// endpoint.setOnConsumeFailed(onConsumeFailed); -// endpoint.setOnConsumeBatchComplete(onConsumeBatchComplete); + endpoint.setOnConsume(onConsume); + endpoint.setOnConsumeFailed(onConsumeFailed); + endpoint.setOnConsumeBatchComplete(onConsumeBatchComplete); endpoint.setDataSource(ds); endpoint.setDataSourceRef(dataSourceRef); endpoint.setElSqlConfig(elSqlConfig); @@ -118,6 +107,13 @@ public class ElsqlComponent extends UriEndpointComponent { super.doStop(); } + /** + * Sets the DataSource to use to communicate with the database. + */ + public void setDataSource(DataSource dataSource) { + this.dataSource = dataSource; + } + public DataSource getDataSource() { return dataSource; } http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java index 9459241..530dc23 100644 --- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java +++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java @@ -21,12 +21,13 @@ import org.apache.camel.component.sql.DefaultSqlEndpoint; import org.apache.camel.component.sql.SqlConsumer; import org.apache.camel.component.sql.SqlPrepareStatementStrategy; import org.apache.camel.component.sql.SqlProcessingStrategy; -import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.jdbc.core.namedparam.SqlParameterSource; public class ElsqlConsumer extends SqlConsumer { - public ElsqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate, String query, + public ElsqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, NamedParameterJdbcTemplate namedJdbcTemplate, String query, SqlParameterSource parameterSource, SqlPrepareStatementStrategy sqlPrepareStatementStrategy, SqlProcessingStrategy sqlProcessingStrategy) { - super(endpoint, processor, jdbcTemplate, query, sqlPrepareStatementStrategy, sqlProcessingStrategy); + super(endpoint, processor, namedJdbcTemplate, query, parameterSource, sqlPrepareStatementStrategy, sqlProcessingStrategy); } } http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java index a07b93e..d2b2cbf 100644 --- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java +++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java @@ -20,6 +20,7 @@ import java.net.URL; import com.opengamma.elsql.ElSql; import com.opengamma.elsql.ElSqlConfig; +import com.opengamma.elsql.SpringSqlParams; import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Processor; @@ -33,12 +34,17 @@ import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ResourceHelper; -import org.springframework.jdbc.core.JdbcTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.namedparam.EmptySqlParameterSource; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.jdbc.core.namedparam.SqlParameterSource; @UriEndpoint(scheme = "elsql", title = "SQL", syntax = "elsql:elsqlName:resourceUri", consumerClass = ElsqlConsumer.class, label = "database,sql") public class ElsqlEndpoint extends DefaultSqlEndpoint { + private static final Logger LOG = LoggerFactory.getLogger(ElsqlEndpoint.class); + private volatile ElSql elSql; private NamedParameterJdbcTemplate namedJdbcTemplate; @@ -59,12 +65,14 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint { @Override public Consumer createConsumer(Processor processor) throws Exception { - SqlProcessingStrategy proStrategy = new ElsqlSqlProcessingStrategy(elsqlName, elSql); + SqlProcessingStrategy proStrategy = new ElsqlSqlProcessingStrategy(elSql); SqlPrepareStatementStrategy preStategy = new ElsqlSqlPrepareStatementStrategy(); - JdbcTemplate template = new JdbcTemplate(getDataSource()); + final SqlParameterSource param = new EmptySqlParameterSource(); + final String sql = elSql.getSql(elsqlName, new SpringSqlParams(param)); + LOG.debug("ElsqlConsumer @{} using sql: {}", elsqlName, sql); - ElsqlConsumer consumer = new ElsqlConsumer(this, processor, template, elsqlName, preStategy, proStrategy); + ElsqlConsumer consumer = new ElsqlConsumer(this, processor, namedJdbcTemplate, sql, param, preStategy, proStrategy); consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll()); consumer.setOnConsume(getOnConsume()); consumer.setOnConsumeFailed(getOnConsumeFailed()); http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java index 4353d9a..78d0d2e 100644 --- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java +++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java @@ -27,6 +27,8 @@ import org.apache.camel.Exchange; import org.apache.camel.component.sql.SqlConstants; import org.apache.camel.component.sql.SqlOutputType; import org.apache.camel.impl.DefaultProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.PreparedStatementCallback; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; @@ -36,6 +38,7 @@ import static org.springframework.jdbc.support.JdbcUtils.closeResultSet; public class ElsqlProducer extends DefaultProducer { + private static final Logger LOG = LoggerFactory.getLogger(ElsqlProducer.class); private final ElSql elSql; private final String elSqlName; private final NamedParameterJdbcTemplate jdbcTemplate; @@ -58,7 +61,7 @@ public class ElsqlProducer extends DefaultProducer { final SqlParameterSource param = new ElsqlSqlMapSource(exchange, data); final String sql = elSql.getSql(elSqlName, new SpringSqlParams(param)); - log.debug("ElSql @{} using sql: {}", elSqlName, sql); + LOG.debug("ElsqlProducer @{} using sql: {}", elSqlName, sql); jdbcTemplate.execute(sql, param, new PreparedStatementCallback() { @Override http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlMapSource.java ---------------------------------------------------------------------- diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlMapSource.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlMapSource.java index e8035b0..aec8d46 100644 --- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlMapSource.java +++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlMapSource.java @@ -20,10 +20,16 @@ import java.util.Collections; import java.util.Map; import org.apache.camel.Exchange; -import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; +import org.springframework.jdbc.core.namedparam.AbstractSqlParameterSource; -public class ElsqlSqlMapSource extends MapSqlParameterSource { +/** + * A {@link org.springframework.jdbc.core.namedparam.SqlParameterSource} that is used by {@link com.opengamma.elsql.ElSql} + * to lookup parameter values. This source will lookup in the Camel {@link Exchange} and {@link org.apache.camel.Message} + * assuming they are Map based. + */ +public class ElsqlSqlMapSource extends AbstractSqlParameterSource { + // use the maps from the Camel Message as they are case insensitive which makes it easier for end users to work with private final Exchange exchange; private final Map bodyMap; private final Map headersMap; @@ -32,23 +38,36 @@ public class ElsqlSqlMapSource extends MapSqlParameterSource { this.exchange = exchange; this.bodyMap = safeMap(exchange.getContext().getTypeConverter().tryConvertTo(Map.class, body)); this.headersMap = safeMap(exchange.getIn().getHeaders()); - - addValue("body", body); - - for (Map.Entry entry : bodyMap.entrySet()) { - String name = entry.getKey().toString(); - Object value = entry.getValue(); - addValue(name, value); - } - for (Map.Entry entry : headersMap.entrySet()) { - String name = entry.getKey().toString(); - Object value = entry.getValue(); - addValue(name, value); - } } private static Map safeMap(Map map) { return (map == null || map.isEmpty()) ? Collections.emptyMap() : map; } + @Override + public boolean hasValue(String paramName) { + if ("body".equals(paramName)) { + return true; + } else if (bodyMap.containsKey(paramName)) { + return true; + } else if (headersMap.containsKey(paramName)) { + return true; + } else { + return false; + } + } + + @Override + public Object getValue(String paramName) throws IllegalArgumentException { + Object answer; + if ("body".equals(paramName)) { + answer = exchange.getIn().getBody(); + } else { + answer = bodyMap.get(paramName); + if (answer == null) { + headersMap.get(paramName); + } + } + return answer; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlProcessingStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlProcessingStrategy.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlProcessingStrategy.java index ea933d8..4180edd 100644 --- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlProcessingStrategy.java +++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlProcessingStrategy.java @@ -23,32 +23,56 @@ import com.opengamma.elsql.ElSql; import com.opengamma.elsql.SpringSqlParams; import org.apache.camel.Exchange; import org.apache.camel.component.sql.DefaultSqlEndpoint; -import org.apache.camel.component.sql.SqlProcessingStrategy; +import org.apache.camel.component.sql.SqlNamedProcessingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.PreparedStatementCallback; +import org.springframework.jdbc.core.namedparam.EmptySqlParameterSource; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.jdbc.core.namedparam.SqlParameterSource; -public class ElsqlSqlProcessingStrategy implements SqlProcessingStrategy { +public class ElsqlSqlProcessingStrategy implements SqlNamedProcessingStrategy { private static final Logger LOG = LoggerFactory.getLogger(ElsqlSqlProcessingStrategy.class); - private final String elSqlName; private final ElSql elSql; - public ElsqlSqlProcessingStrategy(String elSqlName, ElSql elSql) { - this.elSqlName = elSqlName; + public ElsqlSqlProcessingStrategy(ElSql elSql) { this.elSql = elSql; } @Override - public int commit(final DefaultSqlEndpoint endpoint, final Exchange exchange, final Object data, final JdbcTemplate jdbcTemplate, final String query) throws Exception { + public int commit(DefaultSqlEndpoint defaultSqlEndpoint, Exchange exchange, Object data, NamedParameterJdbcTemplate jdbcTemplate, + SqlParameterSource parameterSource, String query) throws Exception { + final SqlParameterSource param = new ElsqlSqlMapSource(exchange, data); - final String sql = elSql.getSql(elSqlName, new SpringSqlParams(param)); - LOG.debug("ElSql @{} using sql: {}", elSqlName, sql); + final String sql = elSql.getSql(query, new SpringSqlParams(param)); + LOG.debug("commit @{} using sql: {}", query, sql); + + return jdbcTemplate.execute(sql, param, new PreparedStatementCallback() { + @Override + public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException { + ps.execute(); + + int updateCount = ps.getUpdateCount(); + if (LOG.isTraceEnabled()) { + LOG.trace("Update count {}", updateCount); + } + return updateCount; + } + }); + } + + @Override + public int commitBatchComplete(DefaultSqlEndpoint endpoint, NamedParameterJdbcTemplate namedJdbcTemplate, + SqlParameterSource parameterSource, String query) throws Exception { + + final SqlParameterSource param = new EmptySqlParameterSource(); + final String sql = elSql.getSql(query, new SpringSqlParams(param)); + LOG.debug("commitBatchComplete @{} using sql: {}", query, sql); - return jdbcTemplate.execute(sql, new PreparedStatementCallback() { + return namedJdbcTemplate.execute(sql, param, new PreparedStatementCallback() { @Override public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException { ps.execute(); @@ -63,7 +87,12 @@ public class ElsqlSqlProcessingStrategy implements SqlProcessingStrategy { } @Override - public int commitBatchComplete(final DefaultSqlEndpoint endpoint, final JdbcTemplate jdbcTemplate, final String query) throws Exception { - return 0; + public int commit(DefaultSqlEndpoint defaultSqlEndpoint, Exchange exchange, Object data, JdbcTemplate jdbcTemplate, String query) throws Exception { + throw new UnsupportedOperationException("Should not be called"); + } + + @Override + public int commitBatchComplete(DefaultSqlEndpoint defaultSqlEndpoint, JdbcTemplate jdbcTemplate, String query) throws Exception { + throw new UnsupportedOperationException("Should not be called"); } } http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlConsumerDeleteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlConsumerDeleteTest.java b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlConsumerDeleteTest.java new file mode 100644 index 0000000..a381bf5 --- /dev/null +++ b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlConsumerDeleteTest.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.elsql; + +import java.util.List; +import java.util.Map; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; + +/** + * + */ +public class ElSqlConsumerDeleteTest extends CamelTestSupport { + + private EmbeddedDatabase db; + private JdbcTemplate jdbcTemplate; + + @Before + public void setUp() throws Exception { + db = new EmbeddedDatabaseBuilder() + .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build(); + + jdbcTemplate = new JdbcTemplate(db); + + super.setUp(); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + + db.shutdown(); + } + + @Test + public void testConsume() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(3); + + assertMockEndpointsSatisfied(); + + List exchanges = mock.getReceivedExchanges(); + assertEquals(3, exchanges.size()); + + assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID")); + assertEquals("Camel", exchanges.get(0).getIn().getBody(Map.class).get("PROJECT")); + assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID")); + assertEquals("AMQ", exchanges.get(1).getIn().getBody(Map.class).get("PROJECT")); + assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID")); + assertEquals("Linux", exchanges.get(2).getIn().getBody(Map.class).get("PROJECT")); + + // some servers may be a bit slow for this + for (int i = 0; i < 5; i++) { + // give it a little tine to delete + Thread.sleep(1000); + int rows = jdbcTemplate.queryForObject("select count(*) from projects", Integer.class); + if (rows == 0) { + break; + } + } + assertEquals("Should have deleted all 3 rows", new Integer(0), jdbcTemplate.queryForObject("select count(*) from projects", Integer.class)); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + getContext().getComponent("elsql", ElsqlComponent.class).setDataSource(db); + + from("elsql:allProjects:elsql/projects.elsql?consumer.onConsume=deleteProject") + .to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/test/resources/elsql/projects.elsql ---------------------------------------------------------------------- diff --git a/components/camel-elsql/src/test/resources/elsql/projects.elsql b/components/camel-elsql/src/test/resources/elsql/projects.elsql index de60eef..1d957ef 100644 --- a/components/camel-elsql/src/test/resources/elsql/projects.elsql +++ b/components/camel-elsql/src/test/resources/elsql/projects.elsql @@ -7,3 +7,7 @@ SELECT * FROM projects ORDER BY id +@NAME(deleteProject) + DELETE + FROM projects + WHERE id = :id http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-elsql/src/test/resources/log4j.properties b/components/camel-elsql/src/test/resources/log4j.properties index d5af410..82e5ef4 100755 --- a/components/camel-elsql/src/test/resources/log4j.properties +++ b/components/camel-elsql/src/test/resources/log4j.properties @@ -18,7 +18,7 @@ # # The logging properties used for testing # -log4j.rootLogger=INFO, out +log4j.rootLogger=INFO, file #log4j.logger.org.apache.camel.component.sql=DEBUG #log4j.logger.org.apache.camel.component.sql=TRACE http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java index 1187881..0f52280 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java @@ -33,6 +33,8 @@ import org.apache.camel.util.ObjectHelper; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.PreparedStatementCallback; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.jdbc.core.namedparam.SqlParameterSource; import static org.springframework.jdbc.support.JdbcUtils.closeResultSet; @@ -40,6 +42,8 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { private final String query; private final JdbcTemplate jdbcTemplate; + private final NamedParameterJdbcTemplate namedJdbcTemplate; + private final SqlParameterSource parameterSource; private final SqlPrepareStatementStrategy sqlPrepareStatementStrategy; private final SqlProcessingStrategy sqlProcessingStrategy; @@ -59,11 +63,23 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { } } - public SqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate, String query, - SqlPrepareStatementStrategy sqlPrepareStatementStrategy, SqlProcessingStrategy sqlProcessingStrategy) { + public SqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate, String query, SqlPrepareStatementStrategy sqlPrepareStatementStrategy, SqlProcessingStrategy sqlProcessingStrategy) { super(endpoint, processor); this.jdbcTemplate = jdbcTemplate; + this.namedJdbcTemplate = null; + this.query = query; + this.parameterSource = null; + this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy; + this.sqlProcessingStrategy = sqlProcessingStrategy; + } + + public SqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, NamedParameterJdbcTemplate namedJdbcTemplate, String query, SqlParameterSource parameterSource, + SqlPrepareStatementStrategy sqlPrepareStatementStrategy, SqlProcessingStrategy sqlProcessingStrategy) { + super(endpoint, processor); + this.jdbcTemplate = null; + this.namedJdbcTemplate = namedJdbcTemplate; this.query = query; + this.parameterSource = parameterSource; this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy; this.sqlProcessingStrategy = sqlProcessingStrategy; } @@ -80,8 +96,7 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { pendingExchanges = 0; final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(query, getEndpoint().isAllowNamedParameters()); - - Integer messagePolled = jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback() { + final PreparedStatementCallback callback = new PreparedStatementCallback() { @Override public Integer doInPreparedStatement(PreparedStatement preparedStatement) throws SQLException, DataAccessException { Queue answer = new LinkedList(); @@ -114,7 +129,14 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { throw ObjectHelper.wrapRuntimeCamelException(e); } } - }); + }; + + Integer messagePolled; + if (namedJdbcTemplate != null) { + messagePolled = namedJdbcTemplate.execute(preparedQuery, parameterSource, callback); + } else { + messagePolled = jdbcTemplate.execute(preparedQuery, callback); + } return messagePolled; } @@ -189,7 +211,13 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { try { // we can only run on consume if there was data if (data != null && sql != null) { - int updateCount = sqlProcessingStrategy.commit(getEndpoint(), exchange, data, jdbcTemplate, sql); + int updateCount; + if (namedJdbcTemplate != null && sqlProcessingStrategy instanceof SqlNamedProcessingStrategy) { + SqlNamedProcessingStrategy namedProcessingStrategy = (SqlNamedProcessingStrategy) sqlProcessingStrategy; + updateCount = namedProcessingStrategy.commit(getEndpoint(), exchange, data, namedJdbcTemplate, parameterSource, sql); + } else { + updateCount = sqlProcessingStrategy.commit(getEndpoint(), exchange, data, jdbcTemplate, sql); + } if (expectedUpdateCount > -1 && updateCount != expectedUpdateCount) { String msg = "Expected update count " + expectedUpdateCount + " but was " + updateCount + " executing query: " + sql; throw new SQLException(msg); @@ -206,7 +234,13 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { try { if (onConsumeBatchComplete != null) { - int updateCount = sqlProcessingStrategy.commitBatchComplete(getEndpoint(), jdbcTemplate, onConsumeBatchComplete); + int updateCount; + if (namedJdbcTemplate != null && sqlProcessingStrategy instanceof SqlNamedProcessingStrategy) { + SqlNamedProcessingStrategy namedProcessingStrategy = (SqlNamedProcessingStrategy) sqlProcessingStrategy; + updateCount = namedProcessingStrategy.commitBatchComplete(getEndpoint(), namedJdbcTemplate, parameterSource, onConsumeBatchComplete); + } else { + updateCount = sqlProcessingStrategy.commitBatchComplete(getEndpoint(), jdbcTemplate, onConsumeBatchComplete); + } log.debug("onConsumeBatchComplete update count {}", updateCount); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlNamedProcessingStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlNamedProcessingStrategy.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlNamedProcessingStrategy.java new file mode 100644 index 0000000..cae9389 --- /dev/null +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlNamedProcessingStrategy.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql; + +import org.apache.camel.Exchange; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.jdbc.core.namedparam.SqlParameterSource; + +/** + * Extended processing strategy for dealing with SQL when consuming, which uses a {@link NamedParameterJdbcTemplate} + * instead of plain {@link org.springframework.jdbc.core.JdbcTemplate}. + */ +public interface SqlNamedProcessingStrategy extends SqlProcessingStrategy { + + /** + * Commit callback if there are a query to be run after processing. + * + * @param endpoint the endpoint + * @param exchange The exchange after it has been processed + * @param data The original data delivered to the route + * @param namedJdbcTemplate The JDBC template + * @param parameterSource Parameter sources for the named JDBC template + * @param query The SQL query to execute + * @return the update count if the query returned an update count + * @throws Exception can be thrown in case of error + */ + int commit(DefaultSqlEndpoint endpoint, Exchange exchange, Object data, + NamedParameterJdbcTemplate namedJdbcTemplate, SqlParameterSource parameterSource, String query) throws Exception; + + /** + * Commit callback when the batch is complete. This allows you to do one extra query after all rows has been processed in the batch. + * + * @param endpoint the endpoint + * @param namedJdbcTemplate The JDBC template + * @param parameterSource Parameter sources for the named JDBC template + * @param query The SQL query to execute + * @return the update count if the query returned an update count + * @throws Exception can be thrown in case of error + */ + int commitBatchComplete(DefaultSqlEndpoint endpoint, NamedParameterJdbcTemplate namedJdbcTemplate, + SqlParameterSource parameterSource, String query) throws Exception; + +}