camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [05/10] camel git commit: CAMEL-9162: camel-elsql component
Date Mon, 05 Oct 2015 09:50:30 GMT
CAMEL-9162: camel-elsql component


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f65b0491
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f65b0491
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f65b0491

Branch: refs/heads/master
Commit: f65b0491f25a94089b547937d46ef5638580ee7a
Parents: 3dd8056
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Mon Oct 5 10:35:04 2015 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Mon Oct 5 10:54:54 2015 +0200

----------------------------------------------------------------------
 .../camel/component/elsql/ElsqlComponent.java   |  24 ++---
 .../camel/component/elsql/ElsqlConsumer.java    |   7 +-
 .../camel/component/elsql/ElsqlEndpoint.java    |  16 ++-
 .../camel/component/elsql/ElsqlProducer.java    |   5 +-
 .../component/elsql/ElsqlSqlMapSource.java      |  49 ++++++---
 .../elsql/ElsqlSqlProcessingStrategy.java       |  51 ++++++++--
 .../elsql/ElSqlConsumerDeleteTest.java          | 100 +++++++++++++++++++
 .../src/test/resources/elsql/projects.elsql     |   4 +
 .../src/test/resources/log4j.properties         |   2 +-
 .../apache/camel/component/sql/SqlConsumer.java |  48 +++++++--
 .../sql/SqlNamedProcessingStrategy.java         |  57 +++++++++++
 11 files changed, 307 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java
index 40e6530..51142e8 100644
--- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java
+++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java
@@ -70,34 +70,23 @@ public class ElsqlComponent extends UriEndpointComponent {
             throw new IllegalArgumentException("Invalid uri. Must by elsql:elsqlName/resourceUri,
was: " + uri);
         }
 
-        /* TODO: add this later
         String onConsume = getAndRemoveParameter(parameters, "consumer.onConsume", String.class);
         if (onConsume == null) {
             onConsume = getAndRemoveParameter(parameters, "onConsume", String.class);
         }
-        if (onConsume != null && isUsePlaceholder()) {
-            onConsume = onConsume.replaceAll(parameterPlaceholderSubstitute, "?");
-        }
         String onConsumeFailed = getAndRemoveParameter(parameters, "consumer.onConsumeFailed",
String.class);
         if (onConsumeFailed == null) {
             onConsumeFailed = getAndRemoveParameter(parameters, "onConsumeFailed", String.class);
         }
-        if (onConsumeFailed != null && isUsePlaceholder()) {
-            onConsumeFailed = onConsumeFailed.replaceAll(parameterPlaceholderSubstitute,
"?");
-        }
         String onConsumeBatchComplete = getAndRemoveParameter(parameters, "consumer.onConsumeBatchComplete",
String.class);
         if (onConsumeBatchComplete == null) {
             onConsumeBatchComplete = getAndRemoveParameter(parameters, "onConsumeBatchComplete",
String.class);
         }
-        if (onConsumeBatchComplete != null && isUsePlaceholder()) {
-            onConsumeBatchComplete = onConsumeBatchComplete.replaceAll(parameterPlaceholderSubstitute,
"?");
-        }
-         */
 
         ElsqlEndpoint endpoint = new ElsqlEndpoint(uri, this, jdbcTemplate, elsqlName, resUri);
-//        endpoint.setOnConsume(onConsume);
-//        endpoint.setOnConsumeFailed(onConsumeFailed);
-//        endpoint.setOnConsumeBatchComplete(onConsumeBatchComplete);
+        endpoint.setOnConsume(onConsume);
+        endpoint.setOnConsumeFailed(onConsumeFailed);
+        endpoint.setOnConsumeBatchComplete(onConsumeBatchComplete);
         endpoint.setDataSource(ds);
         endpoint.setDataSourceRef(dataSourceRef);
         endpoint.setElSqlConfig(elSqlConfig);
@@ -118,6 +107,13 @@ public class ElsqlComponent extends UriEndpointComponent {
         super.doStop();
     }
 
+    /**
+     * Sets the DataSource to use to communicate with the database.
+     */
+    public void setDataSource(DataSource dataSource) {
+        this.dataSource = dataSource;
+    }
+
     public DataSource getDataSource() {
         return dataSource;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java
index 9459241..530dc23 100644
--- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java
+++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java
@@ -21,12 +21,13 @@ import org.apache.camel.component.sql.DefaultSqlEndpoint;
 import org.apache.camel.component.sql.SqlConsumer;
 import org.apache.camel.component.sql.SqlPrepareStatementStrategy;
 import org.apache.camel.component.sql.SqlProcessingStrategy;
-import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
 
 public class ElsqlConsumer extends SqlConsumer {
 
-    public ElsqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate,
String query,
+    public ElsqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, NamedParameterJdbcTemplate
namedJdbcTemplate, String query, SqlParameterSource parameterSource,
                          SqlPrepareStatementStrategy sqlPrepareStatementStrategy, SqlProcessingStrategy
sqlProcessingStrategy) {
-        super(endpoint, processor, jdbcTemplate, query, sqlPrepareStatementStrategy, sqlProcessingStrategy);
+        super(endpoint, processor, namedJdbcTemplate, query, parameterSource, sqlPrepareStatementStrategy,
sqlProcessingStrategy);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
index a07b93e..d2b2cbf 100644
--- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
+++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
@@ -20,6 +20,7 @@ import java.net.URL;
 
 import com.opengamma.elsql.ElSql;
 import com.opengamma.elsql.ElSqlConfig;
+import com.opengamma.elsql.SpringSqlParams;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
@@ -33,12 +34,17 @@ import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ResourceHelper;
-import org.springframework.jdbc.core.JdbcTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.namedparam.EmptySqlParameterSource;
 import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
 
 @UriEndpoint(scheme = "elsql", title = "SQL", syntax = "elsql:elsqlName:resourceUri", consumerClass
= ElsqlConsumer.class, label = "database,sql")
 public class ElsqlEndpoint extends DefaultSqlEndpoint {
 
+    private static final Logger LOG = LoggerFactory.getLogger(ElsqlEndpoint.class);
+
     private volatile ElSql elSql;
     private NamedParameterJdbcTemplate namedJdbcTemplate;
 
@@ -59,12 +65,14 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint {
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        SqlProcessingStrategy proStrategy = new ElsqlSqlProcessingStrategy(elsqlName, elSql);
+        SqlProcessingStrategy proStrategy = new ElsqlSqlProcessingStrategy(elSql);
         SqlPrepareStatementStrategy preStategy = new ElsqlSqlPrepareStatementStrategy();
 
-        JdbcTemplate template = new JdbcTemplate(getDataSource());
+        final SqlParameterSource param = new EmptySqlParameterSource();
+        final String sql = elSql.getSql(elsqlName, new SpringSqlParams(param));
+        LOG.debug("ElsqlConsumer @{} using sql: {}", elsqlName, sql);
 
-        ElsqlConsumer consumer = new ElsqlConsumer(this, processor, template, elsqlName,
preStategy, proStrategy);
+        ElsqlConsumer consumer = new ElsqlConsumer(this, processor, namedJdbcTemplate, sql,
param, preStategy, proStrategy);
         consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
         consumer.setOnConsume(getOnConsume());
         consumer.setOnConsumeFailed(getOnConsumeFailed());

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
index 4353d9a..78d0d2e 100644
--- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
+++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
@@ -27,6 +27,8 @@ import org.apache.camel.Exchange;
 import org.apache.camel.component.sql.SqlConstants;
 import org.apache.camel.component.sql.SqlOutputType;
 import org.apache.camel.impl.DefaultProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.dao.DataAccessException;
 import org.springframework.jdbc.core.PreparedStatementCallback;
 import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
@@ -36,6 +38,7 @@ import static org.springframework.jdbc.support.JdbcUtils.closeResultSet;
 
 public class ElsqlProducer extends DefaultProducer {
 
+    private static final Logger LOG = LoggerFactory.getLogger(ElsqlProducer.class);
     private final ElSql elSql;
     private final String elSqlName;
     private final NamedParameterJdbcTemplate jdbcTemplate;
@@ -58,7 +61,7 @@ public class ElsqlProducer extends DefaultProducer {
 
         final SqlParameterSource param = new ElsqlSqlMapSource(exchange, data);
         final String sql = elSql.getSql(elSqlName, new SpringSqlParams(param));
-        log.debug("ElSql @{} using sql: {}", elSqlName, sql);
+        LOG.debug("ElsqlProducer @{} using sql: {}", elSqlName, sql);
 
         jdbcTemplate.execute(sql, param, new PreparedStatementCallback<Object>() {
             @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlMapSource.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlMapSource.java
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlMapSource.java
index e8035b0..aec8d46 100644
--- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlMapSource.java
+++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlMapSource.java
@@ -20,10 +20,16 @@ import java.util.Collections;
 import java.util.Map;
 
 import org.apache.camel.Exchange;
-import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.AbstractSqlParameterSource;
 
-public class ElsqlSqlMapSource extends MapSqlParameterSource {
+/**
+ * A {@link org.springframework.jdbc.core.namedparam.SqlParameterSource} that is used by
{@link com.opengamma.elsql.ElSql}
+ * to lookup parameter values. This source will lookup in the Camel {@link Exchange} and
{@link org.apache.camel.Message}
+ * assuming they are Map based.
+ */
+public class ElsqlSqlMapSource extends AbstractSqlParameterSource {
 
+    // use the maps from the Camel Message as they are case insensitive which makes it easier
for end users to work with
     private final Exchange exchange;
     private final Map<?, ?> bodyMap;
     private final Map<?, ?> headersMap;
@@ -32,23 +38,36 @@ public class ElsqlSqlMapSource extends MapSqlParameterSource {
         this.exchange = exchange;
         this.bodyMap = safeMap(exchange.getContext().getTypeConverter().tryConvertTo(Map.class,
body));
         this.headersMap = safeMap(exchange.getIn().getHeaders());
-
-        addValue("body", body);
-
-        for (Map.Entry<?, ?> entry : bodyMap.entrySet()) {
-            String name = entry.getKey().toString();
-            Object value = entry.getValue();
-            addValue(name, value);
-        }
-        for (Map.Entry<?, ?> entry : headersMap.entrySet()) {
-            String name = entry.getKey().toString();
-            Object value = entry.getValue();
-            addValue(name, value);
-        }
     }
 
     private static Map<?, ?> safeMap(Map<?, ?> map) {
         return (map == null || map.isEmpty()) ? Collections.emptyMap() : map;
     }
 
+    @Override
+    public boolean hasValue(String paramName) {
+        if ("body".equals(paramName)) {
+            return true;
+        } else if (bodyMap.containsKey(paramName)) {
+            return true;
+        } else if (headersMap.containsKey(paramName)) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public Object getValue(String paramName) throws IllegalArgumentException {
+        Object answer;
+        if ("body".equals(paramName)) {
+            answer = exchange.getIn().getBody();
+        } else {
+            answer = bodyMap.get(paramName);
+            if (answer == null) {
+                headersMap.get(paramName);
+            }
+        }
+        return answer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlProcessingStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlProcessingStrategy.java
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlProcessingStrategy.java
index ea933d8..4180edd 100644
--- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlProcessingStrategy.java
+++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlProcessingStrategy.java
@@ -23,32 +23,56 @@ import com.opengamma.elsql.ElSql;
 import com.opengamma.elsql.SpringSqlParams;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.sql.DefaultSqlEndpoint;
-import org.apache.camel.component.sql.SqlProcessingStrategy;
+import org.apache.camel.component.sql.SqlNamedProcessingStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.dao.DataAccessException;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.core.PreparedStatementCallback;
+import org.springframework.jdbc.core.namedparam.EmptySqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
 import org.springframework.jdbc.core.namedparam.SqlParameterSource;
 
-public class ElsqlSqlProcessingStrategy implements SqlProcessingStrategy {
+public class ElsqlSqlProcessingStrategy implements SqlNamedProcessingStrategy {
 
     private static final Logger LOG = LoggerFactory.getLogger(ElsqlSqlProcessingStrategy.class);
-    private final String elSqlName;
     private final ElSql elSql;
 
-    public ElsqlSqlProcessingStrategy(String elSqlName, ElSql elSql) {
-        this.elSqlName = elSqlName;
+    public ElsqlSqlProcessingStrategy(ElSql elSql) {
         this.elSql = elSql;
     }
 
     @Override
-    public int commit(final DefaultSqlEndpoint endpoint, final Exchange exchange, final Object
data, final JdbcTemplate jdbcTemplate, final String query) throws Exception {
+    public int commit(DefaultSqlEndpoint defaultSqlEndpoint, Exchange exchange, Object data,
NamedParameterJdbcTemplate jdbcTemplate,
+                      SqlParameterSource parameterSource, String query) throws Exception
{
+
         final SqlParameterSource param = new ElsqlSqlMapSource(exchange, data);
-        final String sql = elSql.getSql(elSqlName, new SpringSqlParams(param));
-        LOG.debug("ElSql @{} using sql: {}", elSqlName, sql);
+        final String sql = elSql.getSql(query, new SpringSqlParams(param));
+        LOG.debug("commit @{} using sql: {}", query, sql);
+
+        return jdbcTemplate.execute(sql, param, new PreparedStatementCallback<Integer>()
{
+            @Override
+            public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException,
DataAccessException {
+                ps.execute();
+
+                int updateCount = ps.getUpdateCount();
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Update count {}", updateCount);
+                }
+                return updateCount;
+            }
+        });
+    }
+
+    @Override
+    public int commitBatchComplete(DefaultSqlEndpoint endpoint, NamedParameterJdbcTemplate
namedJdbcTemplate,
+                            SqlParameterSource parameterSource, String query) throws Exception
{
+
+        final SqlParameterSource param = new EmptySqlParameterSource();
+        final String sql = elSql.getSql(query, new SpringSqlParams(param));
+        LOG.debug("commitBatchComplete @{} using sql: {}", query, sql);
 
-        return jdbcTemplate.execute(sql, new PreparedStatementCallback<Integer>() {
+        return namedJdbcTemplate.execute(sql, param, new PreparedStatementCallback<Integer>()
{
             @Override
             public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException,
DataAccessException {
                 ps.execute();
@@ -63,7 +87,12 @@ public class ElsqlSqlProcessingStrategy implements SqlProcessingStrategy
{
     }
 
     @Override
-    public int commitBatchComplete(final DefaultSqlEndpoint endpoint, final JdbcTemplate
jdbcTemplate, final String query) throws Exception {
-        return 0;
+    public int commit(DefaultSqlEndpoint defaultSqlEndpoint, Exchange exchange, Object data,
JdbcTemplate jdbcTemplate, String query) throws Exception {
+        throw new UnsupportedOperationException("Should not be called");
+    }
+
+    @Override
+    public int commitBatchComplete(DefaultSqlEndpoint defaultSqlEndpoint, JdbcTemplate jdbcTemplate,
String query) throws Exception {
+        throw new UnsupportedOperationException("Should not be called");
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlConsumerDeleteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlConsumerDeleteTest.java
b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlConsumerDeleteTest.java
new file mode 100644
index 0000000..a381bf5
--- /dev/null
+++ b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlConsumerDeleteTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elsql;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+/**
+ *
+ */
+public class ElSqlConsumerDeleteTest extends CamelTestSupport {
+
+    private EmbeddedDatabase db;
+    private JdbcTemplate jdbcTemplate;
+
+    @Before
+    public void setUp() throws Exception {
+        db = new EmbeddedDatabaseBuilder()
+                .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+        jdbcTemplate = new JdbcTemplate(db);
+
+        super.setUp();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+
+        db.shutdown();
+    }
+
+    @Test
+    public void testConsume() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(3);
+
+        assertMockEndpointsSatisfied();
+
+        List<Exchange> exchanges = mock.getReceivedExchanges();
+        assertEquals(3, exchanges.size());
+
+        assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
+        assertEquals("Camel", exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
+        assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
+        assertEquals("AMQ", exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
+        assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
+        assertEquals("Linux", exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
+
+        // some servers may be a bit slow for this
+        for (int i = 0; i < 5; i++) {
+            // give it a little tine to delete
+            Thread.sleep(1000);
+            int rows = jdbcTemplate.queryForObject("select count(*) from projects", Integer.class);
+            if (rows == 0) {
+                break;
+            }
+        }
+        assertEquals("Should have deleted all 3 rows", new Integer(0), jdbcTemplate.queryForObject("select
count(*) from projects", Integer.class));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                getContext().getComponent("elsql", ElsqlComponent.class).setDataSource(db);
+
+                from("elsql:allProjects:elsql/projects.elsql?consumer.onConsume=deleteProject")
+                        .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/test/resources/elsql/projects.elsql
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/test/resources/elsql/projects.elsql b/components/camel-elsql/src/test/resources/elsql/projects.elsql
index de60eef..1d957ef 100644
--- a/components/camel-elsql/src/test/resources/elsql/projects.elsql
+++ b/components/camel-elsql/src/test/resources/elsql/projects.elsql
@@ -7,3 +7,7 @@
   SELECT *
   FROM projects
   ORDER BY id
+@NAME(deleteProject)
+  DELETE
+  FROM projects
+  WHERE id = :id

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-elsql/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/test/resources/log4j.properties b/components/camel-elsql/src/test/resources/log4j.properties
index d5af410..82e5ef4 100755
--- a/components/camel-elsql/src/test/resources/log4j.properties
+++ b/components/camel-elsql/src/test/resources/log4j.properties
@@ -18,7 +18,7 @@
 #
 # The logging properties used for testing
 #
-log4j.rootLogger=INFO, out
+log4j.rootLogger=INFO, file
 
 #log4j.logger.org.apache.camel.component.sql=DEBUG
 #log4j.logger.org.apache.camel.component.sql=TRACE

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
index 1187881..0f52280 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
@@ -33,6 +33,8 @@ import org.apache.camel.util.ObjectHelper;
 import org.springframework.dao.DataAccessException;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.core.PreparedStatementCallback;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
 
 import static org.springframework.jdbc.support.JdbcUtils.closeResultSet;
 
@@ -40,6 +42,8 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
 
     private final String query;
     private final JdbcTemplate jdbcTemplate;
+    private final NamedParameterJdbcTemplate namedJdbcTemplate;
+    private final SqlParameterSource parameterSource;
     private final SqlPrepareStatementStrategy sqlPrepareStatementStrategy;
     private final SqlProcessingStrategy sqlProcessingStrategy;
 
@@ -59,11 +63,23 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
         }
     }
 
-    public SqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate,
String query,
-                       SqlPrepareStatementStrategy sqlPrepareStatementStrategy, SqlProcessingStrategy
sqlProcessingStrategy) {
+    public SqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate,
String query, SqlPrepareStatementStrategy sqlPrepareStatementStrategy, SqlProcessingStrategy
sqlProcessingStrategy) {
         super(endpoint, processor);
         this.jdbcTemplate = jdbcTemplate;
+        this.namedJdbcTemplate = null;
+        this.query = query;
+        this.parameterSource = null;
+        this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy;
+        this.sqlProcessingStrategy = sqlProcessingStrategy;
+    }
+
+    public SqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, NamedParameterJdbcTemplate
namedJdbcTemplate, String query, SqlParameterSource parameterSource,
+                       SqlPrepareStatementStrategy sqlPrepareStatementStrategy, SqlProcessingStrategy
sqlProcessingStrategy) {
+        super(endpoint, processor);
+        this.jdbcTemplate = null;
+        this.namedJdbcTemplate = namedJdbcTemplate;
         this.query = query;
+        this.parameterSource = parameterSource;
         this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy;
         this.sqlProcessingStrategy = sqlProcessingStrategy;
     }
@@ -80,8 +96,7 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
         pendingExchanges = 0;
 
         final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(query, getEndpoint().isAllowNamedParameters());
-
-        Integer messagePolled = jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Integer>()
{
+        final PreparedStatementCallback<Integer> callback = new PreparedStatementCallback<Integer>()
{
             @Override
             public Integer doInPreparedStatement(PreparedStatement preparedStatement) throws
SQLException, DataAccessException {
                 Queue<DataHolder> answer = new LinkedList<DataHolder>();
@@ -114,7 +129,14 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
                     throw ObjectHelper.wrapRuntimeCamelException(e);
                 }
             }
-        });
+        };
+
+        Integer messagePolled;
+        if (namedJdbcTemplate != null) {
+            messagePolled = namedJdbcTemplate.execute(preparedQuery, parameterSource, callback);
+        } else {
+            messagePolled = jdbcTemplate.execute(preparedQuery, callback);
+        }
 
         return messagePolled;
     }
@@ -189,7 +211,13 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
             try {
                 // we can only run on consume if there was data
                 if (data != null && sql != null) {
-                    int updateCount = sqlProcessingStrategy.commit(getEndpoint(), exchange,
data, jdbcTemplate, sql);
+                    int updateCount;
+                    if (namedJdbcTemplate != null && sqlProcessingStrategy instanceof
SqlNamedProcessingStrategy) {
+                        SqlNamedProcessingStrategy namedProcessingStrategy = (SqlNamedProcessingStrategy)
sqlProcessingStrategy;
+                        updateCount = namedProcessingStrategy.commit(getEndpoint(), exchange,
data, namedJdbcTemplate, parameterSource, sql);
+                    } else {
+                        updateCount = sqlProcessingStrategy.commit(getEndpoint(), exchange,
data, jdbcTemplate, sql);
+                    }
                     if (expectedUpdateCount > -1 && updateCount != expectedUpdateCount)
{
                         String msg = "Expected update count " + expectedUpdateCount + " but
was " + updateCount + " executing query: " + sql;
                         throw new SQLException(msg);
@@ -206,7 +234,13 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
 
         try {
             if (onConsumeBatchComplete != null) {
-                int updateCount = sqlProcessingStrategy.commitBatchComplete(getEndpoint(),
jdbcTemplate, onConsumeBatchComplete);
+                int updateCount;
+                if (namedJdbcTemplate != null && sqlProcessingStrategy instanceof
SqlNamedProcessingStrategy) {
+                    SqlNamedProcessingStrategy namedProcessingStrategy = (SqlNamedProcessingStrategy)
sqlProcessingStrategy;
+                    updateCount = namedProcessingStrategy.commitBatchComplete(getEndpoint(),
namedJdbcTemplate, parameterSource, onConsumeBatchComplete);
+                } else {
+                    updateCount = sqlProcessingStrategy.commitBatchComplete(getEndpoint(),
jdbcTemplate, onConsumeBatchComplete);
+                }
                 log.debug("onConsumeBatchComplete update count {}", updateCount);
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/camel/blob/f65b0491/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlNamedProcessingStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlNamedProcessingStrategy.java
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlNamedProcessingStrategy.java
new file mode 100644
index 0000000..cae9389
--- /dev/null
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlNamedProcessingStrategy.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.Exchange;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
+
+/**
+ * Extended processing strategy for dealing with SQL when consuming, which uses a {@link
NamedParameterJdbcTemplate}
+ * instead of plain {@link org.springframework.jdbc.core.JdbcTemplate}.
+ */
+public interface SqlNamedProcessingStrategy extends SqlProcessingStrategy {
+
+    /**
+     * Commit callback if there are a query to be run after processing.
+     *
+     * @param endpoint          the endpoint
+     * @param exchange          The exchange after it has been processed
+     * @param data              The original data delivered to the route
+     * @param namedJdbcTemplate The JDBC template
+     * @param parameterSource   Parameter sources for the named JDBC template
+     * @param query             The SQL query to execute
+     * @return the update count if the query returned an update count
+     * @throws Exception can be thrown in case of error
+     */
+    int commit(DefaultSqlEndpoint endpoint, Exchange exchange, Object data,
+               NamedParameterJdbcTemplate namedJdbcTemplate, SqlParameterSource parameterSource,
String query) throws Exception;
+
+    /**
+     * Commit callback when the batch is complete. This allows you to do one extra query
after all rows has been processed in the batch.
+     *
+     * @param endpoint          the endpoint
+     * @param namedJdbcTemplate The JDBC template
+     * @param parameterSource   Parameter sources for the named JDBC template
+     * @param query             The SQL query to execute
+     * @return the update count if the query returned an update count
+     * @throws Exception can be thrown in case of error
+     */
+    int commitBatchComplete(DefaultSqlEndpoint endpoint, NamedParameterJdbcTemplate namedJdbcTemplate,
+                            SqlParameterSource parameterSource, String query) throws Exception;
+
+}


Mime
View raw message