camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/2] camel git commit: CAMEL-9346: camel-sql - Add transacted option
Date Sat, 21 Nov 2015 11:00:59 GMT
Repository: camel
Updated Branches:
  refs/heads/camel-2.16.x e34882f58 -> 584725f48
  refs/heads/master 6b2a7b12a -> 895c938ce


CAMEL-9346: camel-sql - Add transacted option


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/895c938c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/895c938c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/895c938c

Branch: refs/heads/master
Commit: 895c938ced4f7c0f5a36683d0ccc159f5d6fa62d
Parents: 6b2a7b1
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Sat Nov 21 12:00:14 2015 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Sat Nov 21 12:00:14 2015 +0100

----------------------------------------------------------------------
 .../camel/component/sql/DefaultSqlEndpoint.java      | 15 +++++++++++++++
 .../org/apache/camel/component/sql/SqlConsumer.java  | 11 +++++++++++
 2 files changed, 26 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/895c938c/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
index 2de1d64..0f7d4ce 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
@@ -43,6 +43,9 @@ public abstract class DefaultSqlEndpoint extends DefaultPollingEndpoint
{
     private String dataSourceRef;
     @UriParam(description = "Sets the DataSource to use to communicate with the database.")
     private DataSource dataSource;
+    @UriParam(label = "consumer", description = "Enables or disables transaction. If enabled
then if processing an exchange failed then the consumer"
+            + "break out processing any further exchanges to cause a rollback eager.")
+    private boolean transacted;
     @UriParam(label = "producer", description = "Enables or disables batch mode")
     private boolean batch;
     @UriParam(label = "consumer", description = "Sets the maximum number of messages to poll")
@@ -125,6 +128,18 @@ public abstract class DefaultSqlEndpoint extends DefaultPollingEndpoint
{
         this.jdbcTemplate = jdbcTemplate;
     }
 
+    public boolean isTransacted() {
+        return transacted;
+    }
+
+    /**
+     * Enables or disables transaction. If enabled then if processing an exchange failed
then the consumer
+     + break out processing any further exchanges to cause a rollback eager
+     */
+    public void setTransacted(boolean transacted) {
+        this.transacted = transacted;
+    }
+
     public boolean isBatch() {
         return batch;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/895c938c/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
index ce8135a..40e0eb9 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
@@ -27,6 +27,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.RollbackExchangeException;
 import org.apache.camel.impl.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
@@ -207,6 +208,16 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
                 exchange.setException(e);
             }
 
+            if (getEndpoint().isTransacted() && exchange.isFailed()) {
+                // break out as we are transacted and should rollback
+                Exception cause = exchange.getException();
+                if (cause != null) {
+                    throw cause;
+                } else {
+                    throw new RollbackExchangeException("Rollback transaction due error processing
exchange", exchange);
+                }
+            }
+
             // pick the on consume to use
             String sql = exchange.isFailed() ? onConsumeFailed : onConsume;
             try {


Mime
View raw message