camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmuel...@apache.org
Subject svn commit: r1225687 - in /camel/trunk/components/camel-sql/src: main/java/org/apache/camel/processor/idempotent/jdbc/ test/java/org/apache/camel/processor/idempotent/jdbc/ test/resources/org/apache/camel/processor/idempotent/jdbc/
Date Thu, 29 Dec 2011 23:10:55 GMT
Author: cmueller
Date: Thu Dec 29 23:10:55 2011
New Revision: 1225687

URL: http://svn.apache.org/viewvc?rev=1225687&view=rev
Log:
CAMEL-4152: jdbc idempotent repository - table creation

Added:
    camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/CustomizedJdbcMessageIdRepositoryTest.java
    camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/customized-spring.xml
Modified:
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
    camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java

Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java?rev=1225687&r1=1225686&r2=1225687&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java
(original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java
Thu Dec 29 23:10:55 2011
@@ -20,6 +20,8 @@ import javax.sql.DataSource;
 
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.support.ServiceSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.datasource.DataSourceTransactionManager;
 import org.springframework.jmx.export.annotation.ManagedOperation;
@@ -48,6 +50,7 @@ public abstract class AbstractJdbcMessag
     protected String processorName;
     protected TransactionTemplate transactionTemplate;
     protected DataSource dataSource;
+    protected transient Logger log = LoggerFactory.getLogger(getClass());
 
     public AbstractJdbcMessageIdRepository() {
         super();

Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java?rev=1225687&r1=1225686&r2=1225687&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
(original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
Thu Dec 29 23:10:55 2011
@@ -19,7 +19,10 @@ package org.apache.camel.processor.idemp
 import java.sql.Timestamp;
 import javax.sql.DataSource;
 
+import org.springframework.dao.DataAccessException;
 import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
 import org.springframework.transaction.support.TransactionTemplate;
 
 /**
@@ -27,9 +30,11 @@ import org.springframework.transaction.s
  */
 public class JdbcMessageIdRepository extends AbstractJdbcMessageIdRepository<String>
{
 
-    public static final String QUERY_STRING = "SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED
WHERE processorName = ? AND messageId = ?";
-    public static final String INSERT_STRING = "INSERT INTO CAMEL_MESSAGEPROCESSED (processorName,
messageId, createdAt) VALUES (?, ?, ?)";
-    public static final String DELETE_STRING = "DELETE FROM CAMEL_MESSAGEPROCESSED WHERE
processorName = ? AND messageId = ?";
+    private String tableExistsString = "SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 = 0";
+    private String createString = "CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255),
messageId VARCHAR(100), createdAt TIMESTAMP)";
+    private String queryString = "SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName
= ? AND messageId = ?";
+    private String insertString = "INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId,
createdAt) VALUES (?, ?, ?)";
+    private String deleteString = "DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName
= ? AND messageId = ?";
 
     public JdbcMessageIdRepository() {
         super();
@@ -46,20 +51,81 @@ public class JdbcMessageIdRepository ext
     public JdbcMessageIdRepository(JdbcTemplate jdbcTemplate, TransactionTemplate transactionTemplate)
{
         super(jdbcTemplate, transactionTemplate);
     }
+    
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        
+        transactionTemplate.execute(new TransactionCallback<Boolean>() {
+            public Boolean doInTransaction(TransactionStatus status) {
+                try {
+                    // we will receive an exception if the table doesn't exists or we cannot
access it
+                    jdbcTemplate.execute(tableExistsString);
+                    log.debug("table for JdbcMessageIdRepository already exists");
+                } catch (DataAccessException e) {
+                    log.debug("creating table for JdbcMessageIdRepository because it doesn't
exists...");
+                    log.debug("executing query '{}'...", createString);
+                    // we will fail if we cannot create it
+                    jdbcTemplate.execute(createString);
+                    log.info("table created");
+                }
+                return Boolean.TRUE;
+            }
+        });
+    }
 
     @Override
     protected int queryForInt(String key) {
-        return jdbcTemplate.queryForInt(QUERY_STRING, processorName, key);
+        return jdbcTemplate.queryForInt(queryString, processorName, key);
     }
 
     @Override
     protected int insert(String key) {
-        return jdbcTemplate.update(INSERT_STRING, processorName, key, new Timestamp(System.currentTimeMillis()));
+        return jdbcTemplate.update(insertString, processorName, key, new Timestamp(System.currentTimeMillis()));
     }
 
     @Override
     protected int delete(String key) {
-        return jdbcTemplate.update(DELETE_STRING, processorName, key);
+        return jdbcTemplate.update(deleteString, processorName, key);
+    }
+    
+    public String getTableExistsString() {
+        return tableExistsString;
+    }
+
+    public void setTableExistsString(String tableExistsString) {
+        this.tableExistsString = tableExistsString;
+    }
+    
+    public String getCreateString() {
+        return createString;
+    }
+
+    public void setCreateString(String createString) {
+        this.createString = createString;
+    }
+
+    public String getQueryString() {
+        return queryString;
     }
 
+    public void setQueryString(String queryString) {
+        this.queryString = queryString;
+    }
+
+    public String getInsertString() {
+        return insertString;
+    }
+
+    public void setInsertString(String insertString) {
+        this.insertString = insertString;
+    }
+
+    public String getDeleteString() {
+        return deleteString;
+    }
+
+    public void setDeleteString(String deleteString) {
+        this.deleteString = deleteString;
+    }
 }
\ No newline at end of file

Added: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/CustomizedJdbcMessageIdRepositoryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/CustomizedJdbcMessageIdRepositoryTest.java?rev=1225687&view=auto
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/CustomizedJdbcMessageIdRepositoryTest.java
(added)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/CustomizedJdbcMessageIdRepositoryTest.java
Thu Dec 29 23:10:55 2011
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.jdbc;
+
+import java.util.List;
+
+import javax.sql.DataSource;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import org.springframework.transaction.support.TransactionTemplate;
+
+public class CustomizedJdbcMessageIdRepositoryTest extends CamelSpringTestSupport {
+
+    protected static final String SELECT_ALL_STRING = "SELECT messageId FROM CUSTOMIZED_CAMEL_MESSAGEPROCESSED
WHERE processorName = ?";
+    protected static final String DELETE_ALL_STRING = "DELETE FROM CUSTOMIZED_CAMEL_MESSAGEPROCESSED
WHERE processorName = ?";
+    protected static final String PROCESSOR_NAME = "myProcessorName";
+
+    protected JdbcTemplate jdbcTemplate;
+    protected DataSource dataSource;
+    
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @EndpointInject(uri = "mock:error")
+    protected MockEndpoint errorEndpoint;
+    
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        
+        dataSource = context.getRegistry().lookup("dataSource", DataSource.class);
+        jdbcTemplate = new JdbcTemplate(dataSource);
+        jdbcTemplate.afterPropertiesSet();
+        
+        setupRepository();
+    }
+    
+    @Override
+    protected AbstractApplicationContext createApplicationContext() {
+        return new ClassPathXmlApplicationContext("org/apache/camel/processor/idempotent/jdbc/customized-spring.xml");
+    }
+
+    protected void setupRepository() {
+        TransactionTemplate transactionTemplate = new TransactionTemplate();
+        transactionTemplate.setTransactionManager(new DataSourceTransactionManager(dataSource));
+        transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
+        
+        transactionTemplate.execute(new TransactionCallback<Boolean>() {
+            public Boolean doInTransaction(TransactionStatus status) {
+                jdbcTemplate.update(DELETE_ALL_STRING, PROCESSOR_NAME);
+                return Boolean.TRUE;
+            }
+        });
+    }
+
+    @Test
+    public void testDuplicateMessagesAreFilteredOut() throws Exception {
+        resultEndpoint.expectedBodiesReceived("one", "two", "three");
+        errorEndpoint.expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "three", "messageId", "3");
+
+        assertMockEndpointsSatisfied();
+
+        // all 3 messages should be in jdbc repo
+        List<String> receivedMessageIds = jdbcTemplate.queryForList(SELECT_ALL_STRING,
String.class, PROCESSOR_NAME);
+
+        assertEquals(3, receivedMessageIds.size());
+        assertTrue(receivedMessageIds.contains("1"));
+        assertTrue(receivedMessageIds.contains("2"));
+        assertTrue(receivedMessageIds.contains("3"));
+    }
+}
\ No newline at end of file

Modified: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java?rev=1225687&r1=1225686&r2=1225687&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java
(original)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java
Thu Dec 29 23:10:55 2011
@@ -22,7 +22,6 @@ import javax.sql.DataSource;
 
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -40,7 +39,6 @@ import org.springframework.transaction.T
 import org.springframework.transaction.support.TransactionCallback;
 import org.springframework.transaction.support.TransactionTemplate;
 
-
 public class JdbcMessageIdRepositoryTest extends CamelSpringTestSupport {
 
     protected static final String SELECT_ALL_STRING = "SELECT messageId FROM CAMEL_MESSAGEPROCESSED
WHERE processorName = ?";
@@ -96,12 +94,12 @@ public class JdbcMessageIdRepositoryTest
         resultEndpoint.expectedBodiesReceived("one", "two", "three");
         errorEndpoint.expectedMessageCount(0);
 
-        sendMessage("1", "one");
-        sendMessage("2", "two");
-        sendMessage("1", "one");
-        sendMessage("2", "two");
-        sendMessage("1", "one");
-        sendMessage("3", "three");
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "three", "messageId", "3");
 
         assertMockEndpointsSatisfied();
 
@@ -116,7 +114,7 @@ public class JdbcMessageIdRepositoryTest
 
     @Test
     public void testFailedExchangesNotAdded() throws Exception {
-        RouteBuilder interceptor = new RouteBuilder() {
+        RouteBuilder interceptor = new RouteBuilder(context) {
             @Override
             public void configure() throws Exception {
                 interceptSendToEndpoint("mock:result")
@@ -137,12 +135,12 @@ public class JdbcMessageIdRepositoryTest
         errorEndpoint.expectedMessageCount(2);
         resultEndpoint.expectedBodiesReceived("one", "three");
 
-        sendMessage("1", "one");
-        sendMessage("2", "two");
-        sendMessage("1", "one");
-        sendMessage("2", "two");
-        sendMessage("1", "one");
-        sendMessage("3", "three");
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "three", "messageId", "3");
 
         assertMockEndpointsSatisfied();
 
@@ -153,14 +151,4 @@ public class JdbcMessageIdRepositoryTest
         assertTrue("Should contain message 1", receivedMessageIds.contains("1"));
         assertTrue("Should contain message 3", receivedMessageIds.contains("3"));
     }
-
-    protected void sendMessage(final Object messageId, final Object body) {
-        template.send("direct:start", new Processor() {
-            public void process(Exchange exchange) {
-                Message in = exchange.getIn();
-                in.setBody(body);
-                in.setHeader("messageId", messageId);
-            }
-        });
-    }
 }
\ No newline at end of file

Added: camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/customized-spring.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/customized-spring.xml?rev=1225687&view=auto
==============================================================================
--- camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/customized-spring.xml
(added)
+++ camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/customized-spring.xml
Thu Dec 29 23:10:55 2011
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:camel="http://camel.apache.org/schema/spring"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+           http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+           http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+    <bean id="dataSource" class="org.springframework.jdbc.datasource.SingleConnectionDataSource">
+        <property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
+        <property name="url" value="jdbc:hsqldb:mem:camel_jdbc"/>
+        <property name="username" value="sa"/>
+        <property name="password" value=""/>
+    </bean>
+    
+    <bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
+    	<constructor-arg ref="dataSource" />
+    	<constructor-arg value="myProcessorName" />
+    	<property name="tableExistsString" value="SELECT 1 FROM CUSTOMIZED_CAMEL_MESSAGEPROCESSED
WHERE 1 = 0" />
+    	<property name="createString" value="CREATE TABLE CUSTOMIZED_CAMEL_MESSAGEPROCESSED
(processorName VARCHAR(255), messageId VARCHAR(100), createdAt DATETIME)" />
+    	<property name="queryString" value="SELECT COUNT(*) FROM CUSTOMIZED_CAMEL_MESSAGEPROCESSED
WHERE processorName = ? AND messageId = ?" />
+    	<property name="insertString" value="INSERT INTO CUSTOMIZED_CAMEL_MESSAGEPROCESSED
(processorName, messageId, createdAt) VALUES (?, ?, ?)" />
+    	<property name="deleteString" value="DELETE FROM CUSTOMIZED_CAMEL_MESSAGEPROCESSED
WHERE processorName = ? AND messageId = ?" />
+    </bean>
+    
+    <camel:camelContext>
+    	<camel:errorHandler id="deadLetterChannel" type="DeadLetterChannel" deadLetterUri="mock:error">
+    		<camel:redeliveryPolicy maximumRedeliveries="0" maximumRedeliveryDelay="0" logStackTrace="false"
/>
+    	</camel:errorHandler>
+    	
+    	<camel:route id="JdbcMessageIdRepositoryTest" errorHandlerRef="deadLetterChannel">
+    		<camel:from uri="direct:start" />
+    		<camel:idempotentConsumer messageIdRepositoryRef="messageIdRepository">
+    			<camel:header>messageId</camel:header>
+    			<camel:to uri="mock:result" />
+    		</camel:idempotentConsumer>
+    	</camel:route>
+    </camel:camelContext>
+</beans>
\ No newline at end of file



Mime
View raw message