camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1434584 - in /camel/trunk/components/camel-sql/src: main/java/org/apache/camel/component/sql/ test/java/org/apache/camel/component/sql/ test/resources/
Date Thu, 17 Jan 2013 08:12:40 GMT
Author: davsclaus
Date: Thu Jan 17 08:12:39 2013
New Revision: 1434584

URL: http://svn.apache.org/viewvc?rev=1434584&view=rev
Log:
CAMEL-5976: camel-sql now has batch consumer. Work in progress.

Added:
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
  (with props)
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
  (with props)
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
  (with props)
    camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
  (with props)
    camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerTest.java
  (with props)
Modified:
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
    camel/trunk/components/camel-sql/src/test/resources/log4j.properties

Added: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java?rev=1434584&view=auto
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
(added)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
Thu Jan 17 08:12:39 2013
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.PreparedStatementCallback;
+
+/**
+ *
+ */
+public class DefaultSqlProcessingStrategy implements SqlProcessingStrategy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultSqlProcessingStrategy.class);
+
+    @Override
+    public void commit(SqlEndpoint endpoint, final Exchange exchange, Object data, JdbcTemplate
jdbcTemplate, final String query) throws Exception {
+        jdbcTemplate.execute(query, new PreparedStatementCallback<Map<?, ?>>()
{
+            public Map<?, ?> doInPreparedStatement(PreparedStatement ps) throws SQLException
{
+                int expected = ps.getParameterMetaData().getParameterCount();
+
+                Iterator<?> iterator = createIterator(exchange, query, expected);
+                if (iterator != null) {
+                    populateStatement(ps, iterator, expected);
+                    LOG.trace("Execute query {}", query);
+                    ps.execute();
+                }
+
+                return null;
+            };
+        });
+    }
+
+    private Iterator<?> createIterator(Exchange exchange, final String query, final
int expectedParams) {
+        Object body = exchange.getIn().getBody();
+        if (body == null) {
+            return null;
+        }
+
+        // TODO: support named parameters
+/*
+        if (body instanceof Map) {
+            final Map map = (Map) body;
+            return new Iterator() {
+
+                private int current;
+
+                @Override
+                public boolean hasNext() {
+                    return current < expectedParams;
+                }
+
+                @Override
+                public Object next() {
+                    current++;
+                    // TODO: Fix me
+                    return map.get("ID");
+                }
+
+                @Override
+                public void remove() {
+                    // noop
+                }
+            };
+        }*/
+
+        // else force as iterator based
+        Iterator<?> iterator = exchange.getIn().getBody(Iterator.class);
+        return iterator;
+    }
+
+    private void populateStatement(PreparedStatement ps, Iterator<?> iterator, int
expectedParams) throws SQLException {
+        int argNumber = 1;
+        if (expectedParams > 0) {
+            while (iterator != null && iterator.hasNext()) {
+                Object value = iterator.next();
+                LOG.trace("Setting parameter #{} with value: {}", argNumber, value);
+                ps.setObject(argNumber, value);
+                argNumber++;
+            }
+        }
+
+        if (argNumber - 1 != expectedParams) {
+            throw new SQLException("Number of parameters mismatch. Expected: " + expectedParams
+ ", was:" + (argNumber - 1));
+        }
+    }
+
+}
+

Propchange: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java?rev=1434584&view=auto
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
(added)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
Thu Jan 17 08:12:39 2013
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.ObjectHelper;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.ColumnMapRowMapper;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.PreparedStatementCallback;
+import org.springframework.jdbc.core.RowMapperResultSetExtractor;
+
+/**
+ *
+ */
+public class SqlConsumer extends ScheduledBatchPollingConsumer {
+
+    private final String query;
+    private final JdbcTemplate jdbcTemplate;
+
+    /**
+     * Statement to run after data has been processed in the route
+     */
+    private String onConsume;
+
+    /**
+     * Process resultset individually or as a list
+     */
+    private boolean useIterator = true;
+
+    /**
+     * Whether allow empty resultset to be routed to the next hop
+     */
+    private boolean routeEmptyResultSet;
+
+    private static final class DataHolder {
+        private Exchange exchange;
+        private Object data;
+
+        private DataHolder() {
+        }
+    }
+
+    public SqlConsumer(Endpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate,
String query) {
+        super(endpoint, processor);
+        this.jdbcTemplate = jdbcTemplate;
+        this.query = query;
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // must reset for each poll
+        shutdownRunningTask = null;
+        pendingExchanges = 0;
+
+        Integer messagePolled = jdbcTemplate.execute(query, new PreparedStatementCallback<Integer>()
{
+            @Override
+            public Integer doInPreparedStatement(PreparedStatement preparedStatement) throws
SQLException, DataAccessException {
+                Queue<DataHolder> answer = new LinkedList<DataHolder>();
+
+                ResultSet rs = preparedStatement.executeQuery();
+                try {
+                    log.trace("Got result list from query {}", rs);
+
+                    RowMapperResultSetExtractor<Map<String, Object>> mapper =
new RowMapperResultSetExtractor<Map<String, Object>>(new ColumnMapRowMapper());
+                    List<Map<String, Object>> data = mapper.extractData(rs);
+
+                    // create a list of exchange objects with the data
+                    if (useIterator) {
+                        for (Map<String, Object> item : data) {
+                            Exchange exchange = createExchange(item);
+                            DataHolder holder = new DataHolder();
+                            holder.exchange = exchange;
+                            holder.data = item;
+                            answer.add(holder);
+                        }
+                    } else {
+                        if (!data.isEmpty() || routeEmptyResultSet) {
+                            Exchange exchange = createExchange(data);
+                            DataHolder holder = new DataHolder();
+                            holder.exchange = exchange;
+                            holder.data = data;
+                            answer.add(holder);
+                        }
+                    }
+                } finally {
+                    rs.close();
+                }
+
+                // process all the exchanges in this batch
+                try {
+                    int rows = processBatch(CastUtils.cast(answer));
+                    return Integer.valueOf(rows);
+                } catch (Exception e) {
+                    throw ObjectHelper.wrapRuntimeCamelException(e);
+                }
+            }
+        });
+
+        return messagePolled;
+    }
+
+    protected Exchange createExchange(Object data) {
+        final Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOnly);
+        Message msg = exchange.getIn();
+        msg.setBody(data);
+        return exchange;
+    }
+
+    @Override
+    public int processBatch(Queue<Object> exchanges) throws Exception {
+        int total = exchanges.size();
+
+        // limit if needed
+        if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
+            log.debug("Limiting to maximum messages to poll " + maxMessagesPerPoll + " as
there was " + total + " messages in this poll.");
+            total = maxMessagesPerPoll;
+        }
+
+        for (int index = 0; index < total && isBatchAllowed(); index++) {
+            // only loop if we are started (allowed to run)
+            DataHolder holder = ObjectHelper.cast(DataHolder.class, exchanges.poll());
+            Exchange exchange = holder.exchange;
+            Object data = holder.data;
+
+            // add current index and total as properties
+            exchange.setProperty(Exchange.BATCH_INDEX, index);
+            exchange.setProperty(Exchange.BATCH_SIZE, total);
+            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+            // update pending number of exchanges
+            pendingExchanges = total - index - 1;
+
+            // process the current exchange
+            log.debug("Processing exchange: {} with properties: {}", exchange, exchange.getProperties());
+            getProcessor().process(exchange);
+
+            // TODO: support when with CAMEL-5977
+            /*
+            try {
+                if (onConsume != null) {
+                    SqlEndpoint endpoint = (SqlEndpoint) getEndpoint();
+                    endpoint.getProcessingStrategy().commit(endpoint, exchange, data, jdbcTemplate,
onConsume);
+                }
+            } catch (Exception e) {
+                handleException(e);
+            }*/
+        }
+
+        return total;
+    }
+
+    /**
+     * Gets the statement(s) to run after successful processing.
+     * Use comma to separate multiple statements.
+     */
+    public String getOnConsume() {
+        return onConsume;
+    }
+
+    /**
+     * Sets the statement to run after successful processing.
+     * Use comma to separate multiple statements.
+     */
+    public void setOnConsume(String onConsume) {
+        this.onConsume = onConsume;
+    }
+
+    /**
+     * Indicates how resultset should be delivered to the route
+     */
+    public boolean isUseIterator() {
+        return useIterator;
+    }
+
+    /**
+     * Sets how resultset should be delivered to route.
+     * Indicates delivery as either a list or individual object.
+     * defaults to true.
+     */
+    public void setUseIterator(boolean useIterator) {
+        this.useIterator = useIterator;
+    }
+
+    /**
+     * Indicates whether empty resultset should be allowed to be sent to the next hop or
not
+     */
+    public boolean isRouteEmptyResultSet() {
+        return routeEmptyResultSet;
+    }
+
+    /**
+     * Sets whether empty resultset should be allowed to be sent to the next hop.
+     * defaults to false. So the empty resultset will be filtered out.
+     */
+    public void setRouteEmptyResultSet(boolean routeEmptyResultSet) {
+        this.routeEmptyResultSet = routeEmptyResultSet;
+    }
+
+}
+

Propchange: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java?rev=1434584&r1=1434583&r2=1434584&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
(original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
Thu Jan 17 08:12:39 2013
@@ -20,7 +20,7 @@ import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultPollingEndpoint;
 import org.apache.camel.util.UnsafeUriCharactersEncoder;
 import org.springframework.jdbc.core.JdbcTemplate;
 
@@ -29,10 +29,15 @@ import org.springframework.jdbc.core.Jdb
  * question marks (that are parameter placeholders), sharp signs should be used.
  * This is because in camel question mark has other meaning.
  */
-public class SqlEndpoint extends DefaultEndpoint {
+public class SqlEndpoint extends DefaultPollingEndpoint {
     private JdbcTemplate jdbcTemplate;
     private String query;
     private boolean batch;
+    private int maxMessagesPerPoll;
+    private SqlProcessingStrategy processingStrategy = new DefaultSqlProcessingStrategy();
+    private String onConsume;
+
+    // TODO: onConsumeBatchDone to execute a query when batch done
 
     public SqlEndpoint() {
     }
@@ -42,9 +47,13 @@ public class SqlEndpoint extends Default
         this.jdbcTemplate = jdbcTemplate;
         this.query = query;
     }
-    
+
     public Consumer createConsumer(Processor processor) throws Exception {
-        throw new UnsupportedOperationException("Not implemented");
+        SqlConsumer consumer = new SqlConsumer(this, processor, jdbcTemplate, query);
+        consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
+        consumer.setOnConsume(getOnConsume());
+        configureConsumer(consumer);
+        return consumer;
     }
 
     public Producer createProducer() throws Exception {
@@ -79,6 +88,30 @@ public class SqlEndpoint extends Default
         this.batch = batch;
     }
 
+    public int getMaxMessagesPerPoll() {
+        return maxMessagesPerPoll;
+    }
+
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        this.maxMessagesPerPoll = maxMessagesPerPoll;
+    }
+
+    public SqlProcessingStrategy getProcessingStrategy() {
+        return processingStrategy;
+    }
+
+    public void setProcessingStrategy(SqlProcessingStrategy processingStrategy) {
+        this.processingStrategy = processingStrategy;
+    }
+
+    public String getOnConsume() {
+        return onConsume;
+    }
+
+    public void setOnConsume(String onConsume) {
+        this.onConsume = onConsume;
+    }
+
     @Override
     protected String createEndpointUri() {
         // Make sure it's properly encoded

Added: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java?rev=1434584&view=auto
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
(added)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
Thu Jan 17 08:12:39 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.Exchange;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+/**
+ * Processing strategy for dealing with SQL when consuming.
+ */
+public interface SqlProcessingStrategy {
+
+    /**
+     * Commit callback if there are a query to be run after processing.
+     *
+     * @param endpoint     the endpoint
+     * @param exchange     The exchange after it has been processed
+     * @param data         The original data delivered to the route
+     * @param jdbcTemplate The JDBC template
+     * @param query        The SQL query to execute
+     * @throws Exception can be thrown in case of error
+     */
+    void commit(SqlEndpoint endpoint, Exchange exchange, Object data, JdbcTemplate jdbcTemplate,
String query) throws Exception;
+}

Propchange: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java?rev=1434584&view=auto
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
(added)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
Thu Jan 17 08:12:39 2013
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+/**
+ *
+ */
+@Ignore
+public class SqlConsumerDeleteTest extends CamelTestSupport {
+
+    private EmbeddedDatabase db;
+
+    @Before
+    public void setUp() throws Exception {
+        db = new EmbeddedDatabaseBuilder()
+            .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+        super.setUp();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+
+        db.shutdown();
+    }
+
+    @Test
+    public void testConsume() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(3);
+
+        assertMockEndpointsSatisfied();
+
+        List<Exchange> exchanges = mock.getReceivedExchanges();
+        assertEquals(3, exchanges.size());
+
+        assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
+        assertEquals("Camel", exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
+        assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
+        assertEquals("AMQ", exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
+        assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
+        assertEquals("Linux", exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
+
+                from("sql:select * from projects order by id?consumer.onConsume=delete from
projects where id = #")
+                    .to("mock:result");
+            }
+        };
+    }
+}

Propchange: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerTest.java?rev=1434584&view=auto
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerTest.java
(added)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerTest.java
Thu Jan 17 08:12:39 2013
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+/**
+ *
+ */
+public class SqlConsumerTest extends CamelTestSupport {
+
+    private EmbeddedDatabase db;
+
+    @Before
+    public void setUp() throws Exception {
+        db = new EmbeddedDatabaseBuilder()
+            .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+        super.setUp();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+
+        db.shutdown();
+    }
+
+    @Test
+    public void testConsume() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMinimumMessageCount(3);
+
+        assertMockEndpointsSatisfied();
+
+        List<Exchange> exchanges = mock.getReceivedExchanges();
+        assertTrue(exchanges.size() >= 3);
+
+        assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
+        assertEquals("Camel", exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
+        assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
+        assertEquals("AMQ", exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
+        assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
+        assertEquals("Linux", exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
+
+                from("sql:select * from projects order by id")
+                    .to("mock:result");
+            }
+        };
+    }
+}

Propchange: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-sql/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/resources/log4j.properties?rev=1434584&r1=1434583&r2=1434584&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-sql/src/test/resources/log4j.properties Thu Jan 17 08:12:39
2013
@@ -20,8 +20,9 @@
 #
 log4j.rootLogger=INFO, file
 
-#log4j.logger.org.apache.activemq=DEBUG
-#log4j.logger.org.apache.camel=DEBUG
+#log4j.logger.org.apache.camel.component.sql=DEBUG
+#log4j.logger.org.apache.camel.processor.aggregate.sql=DEBUG
+#log4j.logger.org.apache.camel.processor.idempotent.sql=DEBUG
 
 # CONSOLE appender not used by default
 log4j.appender.out=org.apache.log4j.ConsoleAppender



Mime
View raw message