camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1200861 - in /camel/trunk/components/camel-jpa/src: main/java/org/apache/camel/component/jpa/ test/java/org/apache/camel/processor/jpa/ test/resources/
Date Fri, 11 Nov 2011 12:53:35 GMT
Author: davsclaus
Date: Fri Nov 11 12:53:34 2011
New Revision: 1200861

URL: http://svn.apache.org/viewvc?rev=1200861&view=rev
Log:
CAMEL-4668: Fixed JPA consumer to rollback if one Exchange failed

Added:
    camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java
      - copied, changed from r1200770, camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java
Modified:
    camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
    camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
    camel/trunk/components/camel-jpa/src/test/resources/log4j.properties

Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=1200861&r1=1200860&r2=1200861&view=diff
==============================================================================
--- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
(original)
+++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
Fri Nov 11 12:53:34 2011
@@ -83,7 +83,11 @@ public class JpaConsumer extends Schedul
 
                 Query query = getQueryFactory().createQuery(entityManager);
                 configureParameters(query);
+                LOG.trace("Created query {}", query);
+
                 List<Object> results = CastUtils.cast(query.getResultList());
+                LOG.trace("Got result list from query {}", results);
+
                 for (Object result : results) {
                     DataHolder holder = new DataHolder();
                     holder.manager = entityManager;
@@ -96,9 +100,14 @@ public class JpaConsumer extends Schedul
                 try {
                     messagePolled = processBatch(CastUtils.cast(answer));
                 } catch (Exception e) {
-                    throw new PersistenceException(e);
+                    if (e instanceof PersistenceException) {
+                        throw (PersistenceException) e;
+                    } else {
+                        throw new PersistenceException(e);
+                    }
                 }
 
+                LOG.debug("Flushing EntityManager");
                 entityManager.flush();
                 return messagePolled;
             }
@@ -138,11 +147,12 @@ public class JpaConsumer extends Schedul
             if (lockEntity(result, entityManager)) {
                 // process the current exchange
                 LOG.debug("Processing exchange: {}", exchange);
-                try {
-                    getProcessor().process(exchange);
-                } catch (Exception e) {
-                    throw new PersistenceException(e);
+                getProcessor().process(exchange);
+                if (exchange.getException() != null) {
+                    // if we failed then throw exception
+                    throw exchange.getException();
                 }
+
                 getDeleteHandler().deleteObject(entityManager, result);
             }
         }
@@ -285,7 +295,7 @@ public class JpaConsumer extends Schedul
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Failed to achieve lock on entity: " + entity + ". Reason: " +
e, e);
             }
-            //TODO: Find if possible an alternative way to handle results of netive queries.
+            //TODO: Find if possible an alternative way to handle results of native queries.
             //Result of native queries are Arrays and cannot be locked by all JPA Providers.
             if (entity.getClass().isArray()) {
                 return true;

Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java?rev=1200861&r1=1200860&r2=1200861&view=diff
==============================================================================
--- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
(original)
+++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
Fri Nov 11 12:53:34 2011
@@ -54,6 +54,7 @@ public class JpaEndpoint extends Schedul
     private boolean flushOnSend = true;
     private int maxMessagesPerPoll;
     private boolean usePersist;
+    private boolean transacted;
 
     public JpaEndpoint() {
     }
@@ -250,6 +251,14 @@ public class JpaEndpoint extends Schedul
         this.usePersist = usePersist;
     }
 
+    public boolean isTransacted() {
+        return transacted;
+    }
+
+    public void setTransacted(boolean transacted) {
+        this.transacted = transacted;
+    }
+
     // Implementation methods
     // -------------------------------------------------------------------------
 

Copied: camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java
(from r1200770, camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java?p2=camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java&p1=camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java&r1=1200770&r2=1200861&rev=1200861&view=diff
==============================================================================
--- camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java
(original)
+++ camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaTXRollbackTest.java
Fri Nov 11 12:53:34 2011
@@ -17,9 +17,11 @@
 package org.apache.camel.processor.jpa;
 
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.examples.SendEmail;
@@ -38,28 +40,34 @@ import org.springframework.transaction.s
 /**
  * @version 
  */
-public class JpaBatchConsumerTest extends CamelTestSupport {
+public class JpaTXRollbackTest extends CamelTestSupport {
 
     protected static final String SELECT_ALL_STRING = "select x from " + SendEmail.class.getName()
+ " x";
+    private static AtomicInteger foo = new AtomicInteger();
+    private static AtomicInteger bar = new AtomicInteger();
 
     protected ApplicationContext applicationContext;
     protected JpaTemplate jpaTemplate;
 
     @Test
-    public void testBatchConsumer() throws Exception {
-        // first create two records
+    public void testTXRollback() throws Exception {
+        // first create three records
         template.sendBody("jpa://" + SendEmail.class.getName(), new SendEmail("foo@beer.org"));
         template.sendBody("jpa://" + SendEmail.class.getName(), new SendEmail("bar@beer.org"));
+        template.sendBody("jpa://" + SendEmail.class.getName(), new SendEmail("kaboom@beer.org"));
 
+        // should rollback the entire
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(2);
-        mock.message(0).property(Exchange.BATCH_INDEX).isEqualTo(0);
-        mock.message(0).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
-        mock.message(1).property(Exchange.BATCH_INDEX).isEqualTo(1);
-        mock.message(1).property(Exchange.BATCH_COMPLETE).isEqualTo(true);
-        mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 2);
+        // we should retry and try again
+        mock.expectedMinimumMessageCount(4);
+
+        // start route
+        context.startRoute("foo");
 
         assertMockEndpointsSatisfied();
+
+        assertTrue("Should be >= 2, was: " + foo.intValue(), foo.intValue() >= 2);
+        assertTrue("Should be >= 2, was: " + bar.intValue(), bar.intValue() >= 2);
     }
 
     @Override
@@ -67,7 +75,23 @@ public class JpaBatchConsumerTest extend
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("jpa://" + SendEmail.class.getName() + "?delay=2000").to("mock:result");
+                from("jpa://" + SendEmail.class.getName() + "?delay=2000").routeId("foo").noAutoStartup()
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                SendEmail send = exchange.getIn().getBody(SendEmail.class);
+                                if ("kaboom@beer.org".equals(send.getAddress())) {
+                                    throw new IllegalArgumentException("Forced");
+                                }
+
+                                if ("foo@beer.org".equals(send.getAddress())) {
+                                    foo.incrementAndGet();
+                                } else if ("bar@beer.org".equals(send.getAddress())) {
+                                    bar.incrementAndGet();
+                                }
+                            }
+                        })
+                        .to("mock:result");
             }
         };
     }

Modified: camel/trunk/components/camel-jpa/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/test/resources/log4j.properties?rev=1200861&r1=1200860&r2=1200861&view=diff
==============================================================================
--- camel/trunk/components/camel-jpa/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-jpa/src/test/resources/log4j.properties Fri Nov 11 12:53:34
2011
@@ -21,8 +21,9 @@
 log4j.rootLogger=INFO, file
 
 #log4j.logger.org.apache.camel=DEBUG
-log4j.logger.org.springframework=WARN
+#log4j.logger.org.springframework=WARN
 #log4j.logger.org.apache.activemq=DEBUG
+#log4j.logger.org.apache.camel.component.jpa=DEBUG
 
 # CONSOLE appender not used by default
 log4j.appender.out=org.apache.log4j.ConsoleAppender



Mime
View raw message