Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 410AD200BF3 for ; Wed, 30 Nov 2016 15:49:33 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 3FF04160B13; Wed, 30 Nov 2016 14:49:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CFFA4160B19 for ; Wed, 30 Nov 2016 15:49:31 +0100 (CET) Received: (qmail 92762 invoked by uid 500); 30 Nov 2016 14:49:30 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 92645 invoked by uid 99); 30 Nov 2016 14:49:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Nov 2016 14:49:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A007FE7DE7; Wed, 30 Nov 2016 14:49:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Wed, 30 Nov 2016 14:49:32 -0000 Message-Id: In-Reply-To: <470e8a3dd96d47a4a9ee06e2e7acc269@git.apache.org> References: <470e8a3dd96d47a4a9ee06e2e7acc269@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/12] camel git commit: CAMEL-10490 Changed to optimistic lock mode type and added test for concurrent access without lock archived-at: Wed, 30 Nov 2016 14:49:33 -0000 CAMEL-10490 Changed to optimistic lock mode type and added test for concurrent access without lock Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6795c114 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6795c114 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6795c114 Branch: refs/heads/master Commit: 6795c114ac825f020b85d2a6929ddf5f200c78d5 Parents: 9a5f83f Author: Bob Gaudaen Authored: Wed Nov 30 13:39:21 2016 +0100 Committer: Bob Gaudaen Committed: Wed Nov 30 14:28:28 2016 +0100 ---------------------------------------------------------------------- .../camel/component/jpa/JpaPollingConsumer.java | 38 +++++--- .../org/apache/camel/examples/Customer.java | 6 +- .../camel/processor/jpa/AbstractJpaTest.java | 22 ++++- .../jpa/JpaPollingConsumerLockEntityTest.java | 98 +++++++++++++------- .../processor/jpa/JpaPollingConsumerTest.java | 17 +--- 5 files changed, 116 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/6795c114/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaPollingConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaPollingConsumer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaPollingConsumer.java index f787393..8b085ad 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaPollingConsumer.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaPollingConsumer.java @@ -29,6 +29,7 @@ import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; import javax.persistence.Query; import javax.persistence.LockModeType; +import javax.persistence.PersistenceException; import org.apache.camel.Exchange; import org.apache.camel.impl.PollingConsumerSupport; @@ -144,21 +145,32 @@ public class JpaPollingConsumer extends PollingConsumerSupport { LOG.trace("Created query {}", query); Object answer; - List results = query.getResultList(); - if (results != null && results.size() == 1) { - // we only have 1 entity so return that - answer = results.get(0); - } else { - // we have more data so return a list - answer = results; - } + try { + List results = query.getResultList(); + + if (results != null && results.size() == 1) { + // we only have 1 entity so return that + answer = results.get(0); + } else { + // we have more data so return a list + answer = results; + } + + // commit + LOG.debug("Flushing EntityManager"); + entityManager.flush(); - // commit - LOG.debug("Flushing EntityManager"); - entityManager.flush(); - // must clear after flush - entityManager.clear(); + // must clear after flush + entityManager.clear(); + + } catch (PersistenceException e) { + LOG.info("Disposing EntityManager {} on {} due to coming transaction rollback", entityManager, this); + + entityManager.close(); + + throw e; + } return answer; } http://git-wip-us.apache.org/repos/asf/camel/blob/6795c114/components/camel-jpa/src/test/java/org/apache/camel/examples/Customer.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/test/java/org/apache/camel/examples/Customer.java b/components/camel-jpa/src/test/java/org/apache/camel/examples/Customer.java index 1bf2ec6..370cdf9 100644 --- a/components/camel-jpa/src/test/java/org/apache/camel/examples/Customer.java +++ b/components/camel-jpa/src/test/java/org/apache/camel/examples/Customer.java @@ -22,6 +22,7 @@ import javax.persistence.GeneratedValue; import javax.persistence.Id; import javax.persistence.NamedQuery; import javax.persistence.OneToOne; +import javax.persistence.Version; /** * @version @@ -39,6 +40,9 @@ public class Customer { private Address address; private int orderCount; + @Version + private Long version; + public Long getId() { return id; } @@ -74,7 +78,7 @@ public class Customer { @Override public String toString() { // OpenJPA warns about fields being accessed directly in methods if NOT using the corresponding getters. - return "Customer[id: " + getId() + ", name: " + getName() + ", address: " + getAddress() + "]"; + return "Customer[id: " + getId() + ", version: " + version + ", name: " + getName() + ", address: " + getAddress() + "]"; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/6795c114/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/AbstractJpaTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/AbstractJpaTest.java b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/AbstractJpaTest.java index fd0aac5..09d4af0 100644 --- a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/AbstractJpaTest.java +++ b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/AbstractJpaTest.java @@ -25,6 +25,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.examples.SendEmail; import org.apache.camel.spring.SpringCamelContext; import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.openjpa.persistence.util.SourceCode; import org.junit.After; import org.junit.Before; import org.springframework.context.ApplicationContext; @@ -80,10 +81,25 @@ public abstract class AbstractJpaTest extends CamelTestSupport { } protected void assertEntityInDB(int size) throws Exception { - List list = entityManager.createQuery(selectAllString()).getResultList(); - assertEquals(size, list.size()); + assertEntityInDB(size, SendEmail.class); + } + + protected void assertEntityInDB(int size, Class entityType) { + List results = entityManager.createQuery("select o from " + entityType.getName() + " o").getResultList(); + assertEquals(size, results.size()); - assertIsInstanceOf(SendEmail.class, list.get(0)); + assertIsInstanceOf(entityType, results.get(0)); + } + + protected void saveEntityInDB(final Object entity) { + transactionTemplate.execute(new TransactionCallback() { + public Object doInTransaction(TransactionStatus status) { + entityManager.joinTransaction(); + entityManager.persist(entity); + entityManager.flush(); + return null; + } + }); } protected abstract String routeXml(); http://git-wip-us.apache.org/repos/asf/camel/blob/6795c114/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerLockEntityTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerLockEntityTest.java b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerLockEntityTest.java index 89c95db..89b3a47 100644 --- a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerLockEntityTest.java +++ b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerLockEntityTest.java @@ -16,50 +16,42 @@ */ package org.apache.camel.processor.jpa; +import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.examples.Customer; +import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.spring.SpringRouteBuilder; +import org.junit.Before; import org.junit.Test; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallback; +import javax.persistence.OptimisticLockException; import java.util.HashMap; -import java.util.List; import java.util.Map; public class JpaPollingConsumerLockEntityTest extends AbstractJpaTest { protected static final String SELECT_ALL_STRING = "select x from " + Customer.class.getName() + " x"; - protected void save(final Customer customer) { - transactionTemplate.execute(new TransactionCallback() { - public Object doInTransaction(TransactionStatus status) { - entityManager.joinTransaction(); - entityManager.persist(customer); - entityManager.flush(); - return null; - } - }); - } - - protected void assertEntitiesInDatabase(int count, String entity) { - List results = entityManager.createQuery("select o from " + entity + " o").getResultList(); - assertEquals(count, results.size()); - } + @Before + @Override + public void setUp() throws Exception { + super.setUp(); - @Test - public void testPollingConsumerHandlesLockedEntity() throws Exception { Customer customer = new Customer(); customer.setName("Donald Duck"); - save(customer); + saveEntityInDB(customer); Customer customer2 = new Customer(); customer2.setName("Goofy"); - save(customer2); + saveEntityInDB(customer2); - assertEntitiesInDatabase(2, Customer.class.getName()); + assertEntityInDB(2, Customer.class); + } + + @Test + public void testPollingConsumerWithLock() throws Exception { - MockEndpoint mock = getMockEndpoint("mock:result"); + MockEndpoint mock = getMockEndpoint("mock:locked"); mock.expectedBodiesReceived( "orders: 1", "orders: 2" @@ -68,8 +60,27 @@ public class JpaPollingConsumerLockEntityTest extends AbstractJpaTest { Map headers = new HashMap<>(); headers.put("name", "Donald%"); - template.asyncRequestBodyAndHeaders("direct:start", "message", headers); - template.asyncRequestBodyAndHeaders("direct:start", "message", headers); + template.asyncRequestBodyAndHeaders("direct:locked", "message", headers); + template.asyncRequestBodyAndHeaders("direct:locked", "message", headers); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testPollingConsumerWithoutLock() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:not-locked"); + MockEndpoint errMock = getMockEndpoint("mock:error"); + + mock.expectedBodiesReceived("orders: 1"); + + errMock.expectedMessageCount(1); + errMock.message(0).body().isInstanceOf(OptimisticLockException.class); + + Map headers = new HashMap<>(); + headers.put("name", "Donald%"); + + template.asyncRequestBodyAndHeaders("direct:not-locked", "message", headers); + template.asyncRequestBodyAndHeaders("direct:not-locked", "message", headers); assertMockEndpointsSatisfied(); } @@ -78,18 +89,39 @@ public class JpaPollingConsumerLockEntityTest extends AbstractJpaTest { protected RouteBuilder createRouteBuilder() { return new SpringRouteBuilder() { public void configure() { - from("direct:start") - .transacted() - .pollEnrich().simple("jpa://" + Customer.class.getName() + "?joinTransaction=true&consumeLockEntity=true&query=select c from Customer c where c.name like '${header.name}'") - .aggregationStrategy((originalExchange, jpaExchange) -> { + + AggregationStrategy enrichStrategy = new AggregationStrategy() { + @Override + public Exchange aggregate(Exchange originalExchange, Exchange jpaExchange) { Customer customer = jpaExchange.getIn().getBody(Customer.class); customer.setOrderCount(customer.getOrderCount()+1); return jpaExchange; - }) - .to("jpa://" + Customer.class.getName() + "?joinTransaction=true&usePassedInEntityManager=true") + } + }; + + onException(Exception.class) + .setBody().simple("${exception}") + .to("mock:error") + .handled(true); + + from("direct:locked") + .onException(OptimisticLockException.class) + .redeliveryDelay(60) + .maximumRedeliveries(2) + .end() + .pollEnrich().simple("jpa://" + Customer.class.getName() + "?lockModeType=OPTIMISTIC_FORCE_INCREMENT&query=select c from Customer c where c.name like '${header.name}'") + .aggregationStrategy(enrichStrategy) + .to("jpa://" + Customer.class.getName()) + .setBody().simple("orders: ${body.orderCount}") + .to("mock:locked"); + + from("direct:not-locked") + .pollEnrich().simple("jpa://" + Customer.class.getName() + "?query=select c from Customer c where c.name like '${header.name}'") + .aggregationStrategy(enrichStrategy) + .to("jpa://" + Customer.class.getName()) .setBody().simple("orders: ${body.orderCount}") - .to("mock:result"); + .to("mock:not-locked"); } }; } http://git-wip-us.apache.org/repos/asf/camel/blob/6795c114/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerTest.java b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerTest.java index 5b72211..a161a71 100644 --- a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerTest.java +++ b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaPollingConsumerTest.java @@ -23,23 +23,10 @@ import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.examples.Customer; import org.apache.camel.spring.SpringRouteBuilder; import org.junit.Test; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallback; public class JpaPollingConsumerTest extends AbstractJpaTest { protected static final String SELECT_ALL_STRING = "select x from " + Customer.class.getName() + " x"; - protected void save(final Customer customer) { - transactionTemplate.execute(new TransactionCallback() { - public Object doInTransaction(TransactionStatus status) { - entityManager.joinTransaction(); - entityManager.persist(customer); - entityManager.flush(); - return null; - } - }); - } - protected void assertEntitiesInDatabase(int count, String entity) { List results = entityManager.createQuery("select o from " + entity + " o").getResultList(); assertEquals(count, results.size()); @@ -49,10 +36,10 @@ public class JpaPollingConsumerTest extends AbstractJpaTest { public void testPollingConsumer() throws Exception { Customer customer = new Customer(); customer.setName("Donald Duck"); - save(customer); + saveEntityInDB(customer); Customer customer2 = new Customer(); customer2.setName("Goofy"); - save(customer2); + saveEntityInDB(customer2); assertEntitiesInDatabase(2, Customer.class.getName());