camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hadr...@apache.org
Subject svn commit: r711750 - in /activemq/camel/trunk/components/camel-ibatis/src: main/java/org/apache/camel/component/ibatis/ main/java/org/apache/camel/component/ibatis/strategy/ test/java/org/apache/camel/component/ibatis/ test/resources/org/apache/camel/...
Date Thu, 06 Nov 2008 01:21:29 GMT
Author: hadrian
Date: Wed Nov  5 17:21:28 2008
New Revision: 711750

URL: http://svn.apache.org/viewvc?rev=711750&view=rev
Log:
CAMEL-630.  Patch applied with many thanks to Clayton.

Added:
    activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/
    activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/DefaultIBatisProcessingStategy.java
    activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/IBatisProcessingStrategy.java
    activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisQueueTest.java
Modified:
    activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisComponent.java
    activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisEndpoint.java
    activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
    activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisProducer.java
    activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisPollingDelayRouteTest.java
    activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisRouteTest.java
    activemq/camel/trunk/components/camel-ibatis/src/test/resources/org/apache/camel/component/ibatis/Account.xml

Modified: activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisComponent.java?rev=711750&r1=711749&r2=711750&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisComponent.java
(original)
+++ activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisComponent.java
Wed Nov  5 17:21:28 2008
@@ -18,11 +18,13 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.util.Map;
 
 import com.ibatis.sqlmap.client.SqlMapClient;
 import com.ibatis.sqlmap.client.SqlMapClientBuilder;
 import org.apache.camel.Endpoint;
+import org.apache.camel.component.ResourceBasedComponent;
 import org.apache.camel.impl.DefaultComponent;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,55 +36,99 @@
  * for performing SQL operations using an XML mapping file to abstract away the SQL
  *
  * @version $Revision$
+ * 
+ * <pre>
+ * Ibatis Component used to read/write to a database.
+ *
+ * <u>Requires one of the following:</u>
+ *
+ * 1. A Sql Map config file either on the root of
+ * the classpath or explicitly set.
+ *
+ * <b>OR</b>
+ *
+ * 2. A SqlMapClient explicityly set.
+ *
+ * Using Ibatis as a source of data (&lt;from&gt;) you can use this component
+ * to treat a database table as a logical queue.
+ * Details are available in the {@link IBatisPollingConsumer}
+ *
+ * Using Ibatis as a destination for data (&lt;to&gt;) you can use this
+ * component to run an insert statement either on a single message or if the
+ * delivered content contains a collection of messages it can iterate through
+ * the collection and run the insert on each element.
+ * Details are available in the {@link IBatisProducer}
+ * </pre>
+ *
+ * @see IBatisProducer
+ * @see IBatisPollingConsumer
  */
-public class IBatisComponent extends DefaultComponent {
-    public static final String DEFAULT_CONFIG_URI = "SqlMapConfig.xml";
+public class IBatisComponent extends ResourceBasedComponent {
     private static final transient Log LOG = LogFactory.getLog(IBatisComponent.class);
-
-
+    private static final String DEFAULT_CONFIG_URI = "classpath:SqlMapConfig.xml";
     private SqlMapClient sqlMapClient;
-    private Resource sqlMapResource;
+    private String sqlMapConfig = DEFAULT_CONFIG_URI;
+    private boolean useTransactions = true;
 
-    public IBatisComponent() {
+    public IBatisComponent(){
     }
 
-    public IBatisComponent(SqlMapClient sqlMapClient) {
+    public IBatisComponent(SqlMapClient sqlMapClient){
         this.sqlMapClient = sqlMapClient;
     }
 
     // Properties
     //-------------------------------------------------------------------------
+
+    /**
+     * Returns the configured SqlMapClient.
+     *
+     * @return com.ibatis.sqlmap.client.SqlMapClient
+     * @throws IOException If configured with a SqlMapConfig and there
+     * is a problem reading the resource.
+     */
     public SqlMapClient getSqlMapClient() throws IOException {
         if (sqlMapClient == null) {
             sqlMapClient = createSqlMapClient();
         }
         return sqlMapClient;
     }
-
+    
+    /**
+     * Sets the SqlMapClient
+     * @param sqlMapClient The client
+     */
     public void setSqlMapClient(SqlMapClient sqlMapClient) {
         this.sqlMapClient = sqlMapClient;
     }
 
-    public Resource getSqlMapResource() {
-        if (sqlMapResource == null) {
-            sqlMapResource = new ClassPathResource(DEFAULT_CONFIG_URI);
-            LOG.debug("Defaulting to use the iBatis configuration from: " + sqlMapResource);
-        }
-        return sqlMapResource;
-    }
-
-    public void setSqlMapResource(Resource sqlMapResource) {
-        this.sqlMapResource = sqlMapResource;
-    }
-
-    // Implementation methods
-    //-------------------------------------------------------------------------
-    protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws
Exception {
-        return new IBatisEndpoint(uri, this, remaining);
-    }
-
-    protected SqlMapClient createSqlMapClient() throws IOException {
-        InputStream in = getSqlMapResource().getInputStream();
-        return SqlMapClientBuilder.buildSqlMapClient(in);
+    /**
+     * The Spring uri of the SqlMapConfig
+     * @return java.lang.String
+     */
+    public String getSqlMapConfig() {
+        return sqlMapConfig;
+    }
+
+    /**
+     * Creates an IbatisEndpoint for use by an IbatisConsumer or IbatisProducer.
+     */
+    @Override
+    protected IBatisEndpoint createEndpoint(String uri, String remaining, Map params) throws
Exception {
+        return new IBatisEndpoint(uri, this, remaining, params);
+    }
+    
+    private SqlMapClient createSqlMapClient() throws IOException {
+        Resource resource = resolveMandatoryResource(sqlMapConfig);
+        InputStream is = resource.getInputStream();
+        return SqlMapClientBuilder.buildSqlMapClient(new InputStreamReader(is));
+    }
+    
+    public boolean isUseTransactions() {
+        return useTransactions;
+    }
+    
+    public void setUseTransactions(boolean useTransactions) {
+        this.useTransactions = useTransactions;
     }
 }

Modified: activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisEndpoint.java?rev=711750&r1=711749&r2=711750&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisEndpoint.java
(original)
+++ activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisEndpoint.java
Wed Nov  5 17:21:28 2008
@@ -19,13 +19,21 @@
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
+import java.util.Map;
 
 import com.ibatis.sqlmap.client.SqlMapClient;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.PollingConsumer;
+import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.component.ibatis.strategy.DefaultIBatisProcessingStategy;
+import org.apache.camel.component.ibatis.strategy.IBatisProcessingStrategy;
 import org.apache.camel.impl.DefaultPollingEndpoint;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * An <a href="http://activemq.apache.org/camel/ibatis.html>iBatis Endpoint</a>
@@ -33,24 +41,43 @@
  *
  * @version $Revision$
  */
-public class IBatisEndpoint extends DefaultPollingEndpoint {
-    private final String entityName;
+public class IBatisEndpoint extends DefaultPollingEndpoint<Exchange> {
+    private static final transient Log logger = LogFactory.getLog(IBatisEndpoint.class);
 
-    public IBatisEndpoint(String endpointUri, IBatisComponent component, String entityName)
{
-        super(endpointUri, component);
-        this.entityName = entityName;
-    }
+    private IBatisProcessingStrategy strategy;
+    /**
+     * Indicates if transactions are necessary.  Defaulted in IBatisComponent.
+     */
+    private boolean useTransactions;
+    /**
+     * Statement to run when polling or processing
+     */
+    private String statement;
+    /**
+     * Name of a strategy to use for dealing w/
+     * polling a database and consuming the message.  Can be a bean name
+     * or a class name.
+     */
+    private String consumeStrategyName;
+    /**
+     * URI parameters
+     */
+    private Map params;
 
-    public IBatisEndpoint(String endpointUri, String entityName) {
-        super(endpointUri);
-        this.entityName = entityName;
+    public IBatisEndpoint(String uri, IBatisComponent component, 
+            String statement, Map params) throws Exception {
+
+        super(uri, component);
+        this.params = params;
+        setUseTransactions(component.isUseTransactions());
+        setStatement(statement);
     }
 
     @Override
     public IBatisComponent getComponent() {
         return (IBatisComponent) super.getComponent();
     }
-
+    
     public boolean isSingleton() {
         return true;
     }
@@ -60,26 +87,106 @@
     }
 
     @Override
-    public PollingConsumer createPollingConsumer() throws Exception {
+    public IBatisPollingConsumer createConsumer(Processor processor) throws Exception {
+        IBatisPollingConsumer consumer = new IBatisPollingConsumer(this, processor);
+        configureConsumer(consumer);
+        return consumer;
+    }
+/*
+    @Override
+    public PollingConsumer<Exchange> createPollingConsumer() throws Exception {
         return new IBatisPollingConsumer(this);
     }
-
+*/
     /**
-     * Returns the iBatis SQL client
+     * @return SqlMapClient
+     * @throws IOException if the component is configured with a SqlMapConfig
+     * and there is a problem reading the file
      */
-    public SqlMapClient getSqlClient() throws IOException {
+    public SqlMapClient getSqlMapClient() throws IOException {
         return getComponent().getSqlMapClient();
     }
 
-    public String getEntityName() {
-        return entityName;
+    /**
+     * Gets the IbatisProcessingStrategy to to use when consuming messages+        * from
the database
+     * @return IbatisProcessingStrategy
+     * @throws Exception
+     */
+    public IBatisProcessingStrategy getProcessingStrategy() throws Exception {
+        if (strategy == null) {
+            String strategyName = (String) params.get("consumeStrategy");
+            strategy = getStrategy(strategyName, new DefaultIBatisProcessingStategy());
+        }
+        return strategy;
     }
 
-    public void query(Message message) throws IOException, SQLException {
-        String name = getEntityName();
-        List list = getSqlClient().queryForList(name);
-        message.setBody(list);
-        message.setHeader("org.apache.camel.ibatis.queryName", name);
+    /**
+     * Statement to run when polling or processing
+     * @return name of the statement
+    */
+    public String getStatement() {
+        return statement;
+    }
+    
+    /**
+     * Statement to run when polling or processing
+     * @param statement
+     */
+    public void setStatement(String statement) {
+        this.statement = statement;
+    }
 
+    /**
+     * Resolves a strategy in the camelContext or by class name
+     * @param name
+     * @param defaultStrategy
+     * @return IbatisProcessingStrategy
+     * @throws Exception
+     */
+    private IBatisProcessingStrategy getStrategy(String name, IBatisProcessingStrategy defaultStrategy)
throws Exception {
+
+        if (name == null) {
+            return defaultStrategy;
+        }
+
+        IBatisProcessingStrategy strategy = getComponent().getCamelContext().getRegistry().lookup(name,
IBatisProcessingStrategy.class);
+        if (strategy == null) {
+            try {
+                Class<?> clazz = ObjectHelper.loadClass(name);
+                if (clazz != null) {
+                    strategy = ObjectHelper.newInstance(clazz, IBatisProcessingStrategy.class);
+                }
+            } catch(Exception e) {
+                logger.error("Failed to resolve/create processing strategy (" + name + ")",
e);
+                throw e;
+            }
+        }
+        
+        return strategy != null ? strategy : defaultStrategy;
+    }
+
+    /**
+     * Indicates if transactions should be used when calling statements.  Useful if using
a comma separated list when
+     * consuming records.
+     * @return boolean
+     */
+    public boolean isUseTransactions() {
+        return useTransactions;
+    }
+
+    /**
+     * Sets indicator to use transactions for consuming and error handling statements.
+     * @param useTransactions
+     */
+    public void setUseTransactions(boolean useTransactions) {
+        this.useTransactions = useTransactions;
+    }
+
+    public String getConsumeStrategyName() {
+        return consumeStrategyName;
+    }
+    
+    public void setConsumeStrategyName(String consumeStrategyName) {
+        this.consumeStrategyName = consumeStrategyName;
     }
 }

Modified: activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java?rev=711750&r1=711749&r2=711750&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
(original)
+++ activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
Wed Nov  5 17:21:28 2008
@@ -16,44 +16,182 @@
  */
 package org.apache.camel.component.ibatis;
 
+import java.util.List;
+
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.Message;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.impl.PollingConsumerSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * @version $Revision$
+ * <pre>
+ *  Ibatis Camel Component used to read data from a database.
+ * 
+ *  Example Configuration :
+ *  &lt;route&gt;
+ *   &lt;from uri=&quot;ibatis:selectRecords&quot; /&gt;
+ *   &lt;to uri=&quot;jms:destinationQueue&quot; /&gt;
+ *  &lt;/route&gt;
+ * 
+ * 
+ *  This also can be configured to treat a table as a logical queue by defining
+ *  an &quot;onConsume&quot; statement.
+ * 
+ *  Example Configuration :
+ *  &lt;route&gt;
+ *   &lt;from uri=&quot;ibatis:selectRecords?consumer.onConsume=updateRecord&quot;
/&gt;
+ *   &lt;to uri=&quot;jms:destinationQueue&quot; /&gt;
+ *  &lt;/route&gt;
+ * 
+ *  By default, if the select statement contains multiple rows, it will
+ *  iterate over the set and deliver each row to the route.  If this is not the
+ *  desired behavior then set &quot;useIterator=false&quot;.  This will deliver the
entire
+ *  set to the route as a list.
+ * </pre>
+ *
+ * <b>URI Options</b>
+ * <table border="1">
+ * <thead>
+ * <th>Name</th>
+ * <th>Default Value</th>
+ * <th>description</th>
+ * </thead>
+ * <tbody>
+ * <tr>
+ * <td>initialDelay</td>
+ * <td>1000 ms</td>
+ * <td>time before polling starts</td>
+ * </tr>
+ * <tr>
+ * <td>delay</td>
+ * <td>500 ms</td>
+ * <td>time before the next poll</td>
+ * </tr>
+ * <tr>
+ * <td>timeUnit</td>
+ * <td>MILLISECONDS</td>
+ * <td>Time unit to use for delay properties (NANOSECONDS, MICROSECONDS,
+ * MILLISECONDS, SECONDS)</td>
+ * </tr>
+ * <tr>
+ * <td>useIterator</td>
+ * <td>true</td>
+ * <td>If true, processes one exchange per row. If false processes one exchange
+ * for all rows</td>
+ * </tr>
+ * <tr>
+ * <td>onConsume</td>
+ * <td>null</td>
+ * <td>statement to run after data has been processed</td>
+ * </tr>
+ * <tbody> </table>
+ *
+ * @see strategy.IBatisProcessingStrategy
  */
-public class IBatisPollingConsumer extends PollingConsumerSupport {
-    private final IBatisEndpoint endpoint;
-
-    public IBatisPollingConsumer(IBatisEndpoint endpoint) {
-        super(endpoint);
-        this.endpoint = endpoint;
-    }
-
-    public Exchange receive(long timeout) {
-        return receiveNoWait();
-    }
-
-    public Exchange receive() {
-        return receiveNoWait();
-    }
-
-    public Exchange receiveNoWait() {
-        try {
-            Exchange exchange = endpoint.createExchange();
-            Message in = exchange.getIn();
-            endpoint.query(in);
-            return exchange;
-        } catch (Exception e) {
-            throw new RuntimeCamelException("Failed to poll: " + endpoint + ". Reason: "
+ e, e);
+public class IBatisPollingConsumer extends ScheduledPollConsumer<Exchange> {
+    private static Log logger = LogFactory.getLog(IBatisPollingConsumer.class);
+    /**
+     * Statement to run after data has been processed in the route
+     */
+    private String onConsume;
+    /**
+     * Process resultset individually or as a list
+     */
+    private boolean useIterator = true;
+
+    public IBatisPollingConsumer(IBatisEndpoint endpoint, Processor processor) throws Exception
{
+        super(endpoint, processor);
+    }
+
+    public IBatisEndpoint getEndpoint() {
+        return (IBatisEndpoint) super.getEndpoint();
+    }
+
+    /**
+     * Polls the database
+     */
+    @Override
+    protected void poll() throws Exception {
+        IBatisEndpoint endpoint = getEndpoint();
+        List data = endpoint.getProcessingStrategy().poll(this, getEndpoint());
+        if (useIterator) {
+            for (Object object : data) {
+                if (!super.isStopped()) {
+                    process(object);
+                }
+            }
+        } else {
+            process(data);
         }
     }
 
-    protected void doStart() throws Exception {
+    /**
+     * delivers the content
+     *
+     * @param data
+     *            a single row object if useIterator=true otherwise the entire
+     *            result set
+     */
+    protected void process(final Object data) throws Exception {
+        final IBatisEndpoint endpoint = getEndpoint();
+        final Exchange exchange = endpoint.createExchange(ExchangePattern.InOnly);
+
+        Message msg = exchange.getIn();
+        msg.setBody(data);
+        msg.setHeader("org.apache.camel.ibatis.queryName", endpoint.getStatement());
+    
+        logger.debug("Setting message");
+
+        getAsyncProcessor().process(exchange, new AsyncCallback() {
+            public void done(boolean sync) {
+                try {
+                    if (onConsume != null) {
+                        endpoint.getProcessingStrategy().commit(endpoint, exchange, data,
onConsume);
+                    }
+                } catch (Exception e) {
+                    handleException(e);
+                }
+            }
+        });
+    }
+    
+    /**
+     * Gets the statement to run after successful processing
+     * @return Name of the statement
+     */
+    public String getOnConsume() {
+        return onConsume;
+    }
+
+    /**
+     * Sets the statement to run after successful processing
+     * @param onConsume The name of the statement
+     */
+    public void setOnConsume(String onConsume) {
+        this.onConsume = onConsume;
     }
 
-    protected void doStop() throws Exception {
+
+    /**
+     * Indicates how resultset should be delivered to the route
+     * @return boolean 
+     */
+    public boolean isUseIterator() {
+        return useIterator;
+    }
+
+    /**
+     * Sets how resultset should be delivered to route.
+     * Indicates delivery as either a list or individual object.
+     * defaults to true.
+     * @param useIterator
+     */
+    public void setUseIterator(boolean useIterator) {
+        this.useIterator = useIterator;
     }
 }

Modified: activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisProducer.java?rev=711750&r1=711749&r2=711750&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisProducer.java
(original)
+++ activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisProducer.java
Wed Nov  5 17:21:28 2008
@@ -17,19 +17,25 @@
 package org.apache.camel.component.ibatis;
 
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.ObjectHelper;
 
+import com.ibatis.sqlmap.client.SqlMapClient;
+
 /**
  * @version $Revision$
  */
-public class IBatisProducer extends DefaultProducer {
-    private final IBatisEndpoint endpoint;
+public class IBatisProducer extends DefaultProducer<Exchange> {
+    private String statement;
+    private IBatisEndpoint endpoint;
 
     public IBatisProducer(IBatisEndpoint endpoint) {
         super(endpoint);
+        statement = endpoint.getStatement();
         this.endpoint = endpoint;
     }
 
@@ -38,26 +44,24 @@
         return (IBatisEndpoint) super.getEndpoint();
     }
 
+    /**
+     * Calls insert on the SqlMapClient.
+     */
     public void process(Exchange exchange) throws Exception {
+        SqlMapClient client = endpoint.getSqlMapClient();
         Object body = exchange.getIn().getBody();
         if (body == null) {
             // must be a poll so lets do a query
-            endpoint.query(exchange.getOut(true));
+            Message msg = exchange.getOut(true);
+            List list = client.queryForList(statement);
+            msg.setBody(list);
+            msg.setHeader("org.apache.camel.ibatis.queryName", statement);
         } else {
-            String operation = getOperationName(exchange);
-
             // lets handle arrays or collections of objects
             Iterator iter = ObjectHelper.createIterator(body);
             while (iter.hasNext()) {
-                endpoint.getSqlClient().insert(operation, iter.next());
+                client.insert(statement, iter.next());
             }
         }
     }
-
-    /**
-     * Returns the iBatis insert operation name
-     */
-    protected String getOperationName(Exchange exchange) {
-        return endpoint.getEntityName();
-    }
 }

Added: activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/DefaultIBatisProcessingStategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/DefaultIBatisProcessingStategy.java?rev=711750&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/DefaultIBatisProcessingStategy.java
(added)
+++ activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/DefaultIBatisProcessingStategy.java
Wed Nov  5 17:21:28 2008
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ibatis.strategy;
+
+import java.sql.Connection;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.ibatis.IBatisEndpoint;
+import org.apache.camel.component.ibatis.IBatisPollingConsumer;
+
+import com.ibatis.sqlmap.client.SqlMapClient;
+
+/**
+ * Default strategy for consuming messages for a route
+ */
+public class DefaultIBatisProcessingStategy implements IBatisProcessingStrategy {
+    /**
+     * Calls update on the SqlMapClient using the consumeStatement.
+     * Will call multiple statements if the consumeStatement is a comma separated list.
+     * The parameter passed to the statement is the original data delivered to the route.
+     */
+    public void commit(IBatisEndpoint endpoint, Exchange exchange, Object data, String consumeStatement)
throws Exception {
+
+        SqlMapClient client = endpoint.getSqlMapClient();
+        boolean useTrans = endpoint.isUseTransactions();
+        String[] statements = consumeStatement.split(",");
+        try{
+            if (useTrans){
+                client.startTransaction(Connection.TRANSACTION_REPEATABLE_READ);
+            }
+            for (String statement: statements) {
+                client.update(statement.trim(), data);
+            }
+            if (useTrans){
+                client.commitTransaction();
+            }
+        } finally {
+            if (useTrans) {
+                client.endTransaction();
+            }
+        }
+    }
+
+    public List poll(IBatisPollingConsumer consumer, IBatisEndpoint endpoint) throws Exception
{
+        SqlMapClient client = endpoint.getSqlMapClient();
+        return client.queryForList(endpoint.getStatement(), null);
+    }
+}

Added: activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/IBatisProcessingStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/IBatisProcessingStrategy.java?rev=711750&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/IBatisProcessingStrategy.java
(added)
+++ activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/IBatisProcessingStrategy.java
Wed Nov  5 17:21:28 2008
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ibatis.strategy;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.ibatis.IBatisEndpoint;
+import org.apache.camel.component.ibatis.IBatisPollingConsumer;
+
+/**
+ * Processing strategy for dealing with IBatis records 
+ *
+ */
+public interface IBatisProcessingStrategy {
+
+	/**
+	 * Called when record is being queried.
+	 * @param consumer     The Ibatis Polling Consumer
+	 * @param endpoint     The Ibatis Endpoint
+	 * @return             Results of the query as a java.util.List
+	 * @throws Exception
+	 */
+	List poll (IBatisPollingConsumer consumer, IBatisEndpoint endpoint) throws Exception;
+	
+	/**
+	 * Called if there is a statement to be run after processing
+	 * @param endpoint     The Ibatis Enpoint
+	 * @param exchange     The exchange after it has been processed
+	 * @param data         The original data delivered to the route
+	 * @param consumeStatement The update statement to run
+	 * @throws Exception
+	 */
+	void commit(IBatisEndpoint endpoint, Exchange exchange, Object data, String consumeStatement)
throws Exception;
+}

Modified: activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisPollingDelayRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisPollingDelayRouteTest.java?rev=711750&r1=711749&r2=711750&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisPollingDelayRouteTest.java
(original)
+++ activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisPollingDelayRouteTest.java
Wed Nov  5 17:21:28 2008
@@ -89,7 +89,7 @@
 
     private Connection createConnection() throws Exception {
         IBatisEndpoint endpoint = resolveMandatoryEndpoint("ibatis:selectAllAccounts", IBatisEndpoint.class);
-        return endpoint.getSqlClient().getDataSource().getConnection();
+        return endpoint.getSqlMapClient().getDataSource().getConnection();
     }
 
 }
\ No newline at end of file

Added: activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisQueueTest.java?rev=711750&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisQueueTest.java
(added)
+++ activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisQueueTest.java
Wed Nov  5 17:21:28 2008
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ibatis;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+public class IBatisQueueTest extends ContextTestSupport {
+
+	   public void testConsume() throws Exception {
+
+	        MockEndpoint endpoint = getMockEndpoint("mock:results");
+	        endpoint.expectedMinimumMessageCount(2);
+
+	        
+	        Account account = new Account();
+	        account.setId(1);
+	        account.setFirstName("Bob");
+	        account.setLastName("Denver");
+	        account.setEmailAddress("TryGuessingGilligan@gmail.com");
+
+	        template.sendBody("direct:start", account);
+	        
+	        account = new Account();
+	        account.setId(2);
+	        account.setFirstName("Alan");
+	        account.setLastName("Hale");
+	        account.setEmailAddress("TryGuessingSkipper@gmail.com");
+
+	        template.sendBody("direct:start", account);
+	        
+	        assertMockEndpointsSatisifed();
+	        
+	        // now lets poll that the account has been inserted
+	        Object answer = template.sendBody("ibatis:selectProcessedAccounts", null);
+	        List body = assertIsInstanceOf(List.class, answer);
+
+	        assertEquals("Wrong size: " + body, 2, body.size());
+	        Account actual = assertIsInstanceOf(Account.class, body.get(0));
+
+	        assertEquals("Account.getFirstName()", "Bob", actual.getFirstName());
+	        assertEquals("Account.getLastName()", "Denver", actual.getLastName());
+
+	        answer = template.sendBody("ibatis:selectUnprocessedAccounts", null);
+	        
+	        
+	        
+	        body = assertIsInstanceOf(List.class, answer);
+	        assertEquals("Wrong size: " + body, 0, body.size());
+	        
+		   
+	    }
+	    
+	   
+
+	    @Override
+	    protected RouteBuilder createRouteBuilder() throws Exception {
+	        return new RouteBuilder() {
+	            public void configure() throws Exception {
+	                from("ibatis:selectUnprocessedAccounts?consumer.onConsume=consumeAccount").to("mock:results");
+
+	                from("direct:start").to("ibatis:insertAccount");
+	                
+	                
+	            }
+	        };
+	    }
+	
+	   @Override
+	    protected void setUp() throws Exception {
+	        super.setUp();
+
+	        // lets create the database...
+	        IBatisEndpoint endpoint = resolveMandatoryEndpoint("ibatis:Account", IBatisEndpoint.class);
+	        Connection connection = endpoint.getSqlMapClient().getDataSource().getConnection();
+	        Statement statement = connection.createStatement();
+	        statement.execute("create table ACCOUNT ( ACC_ID INTEGER , ACC_FIRST_NAME VARCHAR(255),
ACC_LAST_NAME VARCHAR(255), ACC_EMAIL VARCHAR(255), PROCESSED BOOLEAN DEFAULT false)");
+	        connection.close();
+	    }
+	    
+	    @Override
+	    protected void tearDown() throws Exception{
+	    	super.tearDown();
+	        IBatisEndpoint endpoint = resolveMandatoryEndpoint("ibatis:Account", IBatisEndpoint.class);
+	        Connection connection = endpoint.getSqlMapClient().getDataSource().getConnection();
+	        Statement statement = connection.createStatement();
+	        statement.execute("drop table ACCOUNT");
+	        connection.close();
+	    	
+	    	
+	    }
+	
+}

Modified: activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisRouteTest.java?rev=711750&r1=711749&r2=711750&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisRouteTest.java
(original)
+++ activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisRouteTest.java
Wed Nov  5 17:21:28 2008
@@ -91,7 +91,6 @@
 
     private Connection createConnection() throws Exception {
         IBatisEndpoint endpoint = resolveMandatoryEndpoint("ibatis:Account", IBatisEndpoint.class);
-        return endpoint.getSqlClient().getDataSource().getConnection();
+        return endpoint.getSqlMapClient().getDataSource().getConnection();
     }
-
 }

Modified: activemq/camel/trunk/components/camel-ibatis/src/test/resources/org/apache/camel/component/ibatis/Account.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/test/resources/org/apache/camel/component/ibatis/Account.xml?rev=711750&r1=711749&r2=711750&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ibatis/src/test/resources/org/apache/camel/component/ibatis/Account.xml
(original)
+++ activemq/camel/trunk/components/camel-ibatis/src/test/resources/org/apache/camel/component/ibatis/Account.xml
Wed Nov  5 17:21:28 2008
@@ -80,4 +80,16 @@
     delete from ACCOUNT where ACC_ID = #id#
   </delete>
 
+  <select id="selectUnprocessedAccounts" resultMap="AccountResult">
+       select * from ACCOUNT where PROCESSED = false
+  </select>
+ 
+  <select id="selectProcessedAccounts" resultMap="AccountResult">
+       select * from ACCOUNT where PROCESSED = true
+  </select>
+ 
+  <update id="consumeAccount" parameterClass="Account">
+       update ACCOUNT set PROCESSED = true where ACC_ID = #id#
+  </update>
+
 </sqlMap>
\ No newline at end of file



Mime
View raw message