camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1434669 - in /camel/trunk/components/camel-sql/src: main/java/org/apache/camel/component/sql/ test/java/org/apache/camel/component/sql/
Date Thu, 17 Jan 2013 13:45:55 GMT
Author: davsclaus
Date: Thu Jan 17 13:45:55 2013
New Revision: 1434669

URL: http://svn.apache.org/viewvc?rev=1434669&view=rev
Log:
CAMEL-5976: camel-sql consumer can now do onConsume to delete row after processing etc.

Added:
    camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java
      - copied, changed from r1434662, camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
    camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java
      - copied, changed from r1434662, camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
Modified:
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
    camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java

Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java?rev=1434669&r1=1434668&r2=1434669&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
(original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
Thu Jan 17 13:45:55 2013
@@ -60,5 +60,27 @@ public class DefaultSqlProcessingStrateg
         });
     }
 
+    @Override
+    public int commitBatchComplete(final SqlEndpoint endpoint, final JdbcTemplate jdbcTemplate,
final String query) throws Exception {
+        final String preparedQuery = endpoint.getPrepareStatementStrategy().prepareQuery(query,
endpoint.isAllowNamedParameters());
+
+        return jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Integer>()
{
+            public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException
{
+                int expected = ps.getParameterMetaData().getParameterCount();
+                if (expected != 0) {
+                    throw new IllegalArgumentException("Query onConsumeBatchComplete " +
query + " cannot have parameters, was " + expected);
+                }
+
+                LOG.trace("Execute query {}", query);
+                ps.execute();
+
+                int updateCount = ps.getUpdateCount();
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Update count {}", updateCount);
+                }
+                return updateCount;
+            };
+        });
+    }
 }
 

Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java?rev=1434669&r1=1434668&r2=1434669&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
(original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
Thu Jan 17 13:45:55 2013
@@ -59,9 +59,17 @@ public class SqlComponent extends Defaul
         if (onConsume != null) {
             onConsume = onConsume.replaceAll(parameterPlaceholderSubstitute, "?");
         }
+        String onConsumeBatchComplete = getAndRemoveParameter(parameters, "consumer.onConsumeBatchComplete",
String.class);
+        if (onConsumeBatchComplete == null) {
+            onConsumeBatchComplete = getAndRemoveParameter(parameters, "onConsumeBatchComplete",
String.class);
+        }
+        if (onConsumeBatchComplete != null) {
+            onConsumeBatchComplete = onConsumeBatchComplete.replaceAll(parameterPlaceholderSubstitute,
"?");
+        }
 
         SqlEndpoint endpoint = new SqlEndpoint(uri, this, jdbcTemplate, query);
         endpoint.setOnConsume(onConsume);
+        endpoint.setOnConsumeBatchComplete(onConsumeBatchComplete);
         return endpoint;
     }
 

Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java?rev=1434669&r1=1434668&r2=1434669&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
(original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
Thu Jan 17 13:45:55 2013
@@ -46,6 +46,7 @@ public class SqlConsumer extends Schedul
     private final JdbcTemplate jdbcTemplate;
 
     private String onConsume;
+    private String onConsumeBatchComplete;
     private boolean useIterator = true;
     private boolean routeEmptyResultSet;
     private int expectedUpdateCount = -1;
@@ -178,6 +179,19 @@ public class SqlConsumer extends Schedul
             }
         }
 
+        try {
+            if (onConsumeBatchComplete != null) {
+                int updateCount = getEndpoint().getProcessingStrategy().commitBatchComplete(getEndpoint(),
jdbcTemplate, onConsumeBatchComplete);
+                log.debug("onConsumeBatchComplete update count {}", updateCount);
+            }
+        } catch (Exception e) {
+            if (breakBatchOnConsumeFail) {
+                throw e;
+            } else {
+                handleException("Error executing onConsumeBatchComplete query " + onConsumeBatchComplete,
e);
+            }
+        }
+
         return total;
     }
 
@@ -197,6 +211,14 @@ public class SqlConsumer extends Schedul
         this.onConsume = onConsume;
     }
 
+    public String getOnConsumeBatchComplete() {
+        return onConsumeBatchComplete;
+    }
+
+    public void setOnConsumeBatchComplete(String onConsumeBatchComplete) {
+        this.onConsumeBatchComplete = onConsumeBatchComplete;
+    }
+
     /**
      * Indicates how resultset should be delivered to the route
      */

Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java?rev=1434669&r1=1434668&r2=1434669&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
(original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
Thu Jan 17 13:45:55 2013
@@ -37,6 +37,7 @@ public class SqlEndpoint extends Default
     private SqlProcessingStrategy processingStrategy = new DefaultSqlProcessingStrategy();
     private SqlPrepareStatementStrategy prepareStatementStrategy = new DefaultSqlPrepareStatementStrategy();
     private String onConsume;
+    private String onConsumeBatchComplete;
     private boolean allowNamedParameters = true;
 
     // TODO: onConsumeBatchDone to execute a query when batch done
@@ -54,6 +55,7 @@ public class SqlEndpoint extends Default
         SqlConsumer consumer = new SqlConsumer(this, processor, jdbcTemplate, query);
         consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
         consumer.setOnConsume(getOnConsume());
+        consumer.setOnConsumeBatchComplete(getOnConsumeBatchComplete());
         configureConsumer(consumer);
         return consumer;
     }
@@ -122,6 +124,14 @@ public class SqlEndpoint extends Default
         this.onConsume = onConsume;
     }
 
+    public String getOnConsumeBatchComplete() {
+        return onConsumeBatchComplete;
+    }
+
+    public void setOnConsumeBatchComplete(String onConsumeBatchComplete) {
+        this.onConsumeBatchComplete = onConsumeBatchComplete;
+    }
+
     public boolean isAllowNamedParameters() {
         return allowNamedParameters;
     }

Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java?rev=1434669&r1=1434668&r2=1434669&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
(original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
Thu Jan 17 13:45:55 2013
@@ -36,4 +36,16 @@ public interface SqlProcessingStrategy {
      * @throws Exception can be thrown in case of error
      */
     int commit(SqlEndpoint endpoint, Exchange exchange, Object data, JdbcTemplate jdbcTemplate,
String query) throws Exception;
+
+    /**
+     * Commit callback when the batch is complete. This allows you to do one extra query
after all rows has been processed in the batch.
+     *
+     * @param endpoint     the endpoint
+     * @param jdbcTemplate The JDBC template
+     * @param query        The SQL query to execute
+     * @return the update count if the query returned an update count
+     * @throws Exception can be thrown in case of error
+     */
+    int commitBatchComplete(SqlEndpoint endpoint, JdbcTemplate jdbcTemplate, String query)
throws Exception;
+
 }

Copied: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java
(from r1434662, camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java?p2=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java&p1=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java&r1=1434662&r2=1434669&rev=1434669&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
(original)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java
Thu Jan 17 13:45:55 2013
@@ -16,11 +16,6 @@
  */
 package org.apache.camel.component.sql;
 
-import java.util.List;
-import java.util.Map;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -35,7 +30,7 @@ import org.springframework.jdbc.datasour
 /**
  *
  */
-public class SqlConsumerDeleteTest extends CamelTestSupport {
+public class SqlConsumerDeleteBatchCompleteTest extends CamelTestSupport {
 
     private EmbeddedDatabase db;
     private JdbcTemplate jdbcTemplate;
@@ -60,24 +55,13 @@ public class SqlConsumerDeleteTest exten
     @Test
     public void testConsume() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(3);
+        mock.expectedMessageCount(1);
 
         assertMockEndpointsSatisfied();
 
-        List<Exchange> exchanges = mock.getReceivedExchanges();
-        assertEquals(3, exchanges.size());
-
-        assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
-        assertEquals("Camel", exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
-        assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
-        assertEquals("AMQ", exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
-        assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
-        assertEquals("Linux", exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
-
         // give it a little tine to delete
         Thread.sleep(500);
 
-        // there should only be 1 row in the table
         assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select
count(*) from projects"));
     }
 
@@ -88,7 +72,7 @@ public class SqlConsumerDeleteTest exten
             public void configure() throws Exception {
                 getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
 
-                from("sql:select * from projects order by id?consumer.onConsume=delete from
projects where id = :#id")
+                from("sql:select * from projects order by id?consumer.onConsumeBatchComplete=delete
from projects")
                     .to("mock:result");
             }
         };

Modified: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java?rev=1434669&r1=1434668&r2=1434669&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
(original)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
Thu Jan 17 13:45:55 2013
@@ -77,7 +77,6 @@ public class SqlConsumerDeleteTest exten
         // give it a little tine to delete
         Thread.sleep(500);
 
-        // there should only be 1 row in the table
         assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select
count(*) from projects"));
     }
 

Copied: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java
(from r1434662, camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java?p2=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java&p1=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java&r1=1434662&r2=1434669&rev=1434669&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
(original)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java
Thu Jan 17 13:45:55 2013
@@ -16,11 +16,6 @@
  */
 package org.apache.camel.component.sql;
 
-import java.util.List;
-import java.util.Map;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -35,7 +30,7 @@ import org.springframework.jdbc.datasour
 /**
  *
  */
-public class SqlConsumerDeleteTest extends CamelTestSupport {
+public class SqlConsumerDeleteTransformTest extends CamelTestSupport {
 
     private EmbeddedDatabase db;
     private JdbcTemplate jdbcTemplate;
@@ -60,24 +55,13 @@ public class SqlConsumerDeleteTest exten
     @Test
     public void testConsume() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(3);
+        mock.expectedBodiesReceived("The project is Camel", "The project is AMQ", "The project
is Linux");
 
         assertMockEndpointsSatisfied();
 
-        List<Exchange> exchanges = mock.getReceivedExchanges();
-        assertEquals(3, exchanges.size());
-
-        assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
-        assertEquals("Camel", exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
-        assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
-        assertEquals("AMQ", exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
-        assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
-        assertEquals("Linux", exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
-
         // give it a little tine to delete
         Thread.sleep(500);
 
-        // there should only be 1 row in the table
         assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select
count(*) from projects"));
     }
 
@@ -88,7 +72,10 @@ public class SqlConsumerDeleteTest exten
             public void configure() throws Exception {
                 getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
 
+                // even if we transform the exchange we can still do onConsume as we have
the original data at
+                // the point when onConsume is executed
                 from("sql:select * from projects order by id?consumer.onConsume=delete from
projects where id = :#id")
+                    .transform().simple("The project is ${body[project]}")
                     .to("mock:result");
             }
         };



Mime
View raw message