camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject git commit: CAMEL-7364: Fixed JPA to not share EntityManager with concurrent threads or other exchanges, as an EntityManager is not thread-safe accordingly to the JPA spec.
Date Tue, 02 Sep 2014 12:11:46 GMT
Repository: camel
Updated Branches:
  refs/heads/master bf88ab737 -> 2728b2f1d


CAMEL-7364: Fixed JPA to not share EntityManager with concurrent threads or other exchanges,
as an EntityManager is not thread-safe accordingly to the JPA spec.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2728b2f1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2728b2f1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2728b2f1

Branch: refs/heads/master
Commit: 2728b2f1d9d5a3c1d2dae2e8efc04ec88d5bf4d9
Parents: bf88ab7
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Tue Sep 2 14:11:14 2014 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Tue Sep 2 14:11:31 2014 +0200

----------------------------------------------------------------------
 .../idempotent/IdempotentConsumer.java          |  22 ++-
 .../idempotent/IdempotentOnCompletion.java      |  19 ++-
 .../camel/spi/ExchangeIdempotentRepository.java |  83 +++++++++
 .../apache/camel/spi/IdempotentRepository.java  |   6 +-
 .../ExchangeIdempotentConsumerTest.java         | 167 +++++++++++++++++++
 .../jpa/JpaCloseEntityManagerOnCompletion.java  |  43 +++++
 .../camel/component/jpa/JpaConstants.java       |   8 +-
 .../apache/camel/component/jpa/JpaConsumer.java |  24 ++-
 .../apache/camel/component/jpa/JpaEndpoint.java |  14 +-
 .../apache/camel/component/jpa/JpaHelper.java   |  68 ++++++++
 .../apache/camel/component/jpa/JpaProducer.java |  51 ++----
 .../idempotent/jpa/JpaMessageIdRepository.java  |  94 ++++++++---
 .../component/jpa/AbstractJpaMethodTest.java    |   2 +-
 .../fileConsumerJpaIdempotentTest-config.xml    |   2 +-
 14 files changed, 516 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2728b2f1/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
index d2cbc42..100a660 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
@@ -26,6 +26,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
+import org.apache.camel.spi.ExchangeIdempotentRepository;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
@@ -37,6 +38,15 @@ import org.slf4j.LoggerFactory;
 /**
  * An implementation of the <a
  * href="http://camel.apache.org/idempotent-consumer.html">Idempotent Consumer</a>
pattern.
+ * <p/>
+ * This implementation supports idempotent repositories implemented as
+ * <ul>
+ *     <li>IdempotentRepository</li>
+ *     <li>ExchangeIdempotentRepository</li>
+ * </ul>
+ *
+ * @see org.apache.camel.spi.IdempotentRepository
+ * @see org.apache.camel.spi.ExchangeIdempotentRepository
  */
 public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor, Navigate<Processor>
{
     private static final Logger LOG = LoggerFactory.getLogger(IdempotentConsumer.class);
@@ -76,10 +86,18 @@ public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor
         boolean newKey;
         if (eager) {
             // add the key to the repository
-            newKey = idempotentRepository.add(messageId);
+            if (idempotentRepository instanceof ExchangeIdempotentRepository) {
+                newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).add(exchange,
messageId);
+            } else {
+                newKey = idempotentRepository.add(messageId);
+            }
         } else {
             // check if we already have the key
-            newKey = !idempotentRepository.contains(messageId);
+            if (idempotentRepository instanceof ExchangeIdempotentRepository) {
+                newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).contains(exchange,
messageId);
+            } else {
+                newKey = !idempotentRepository.contains(messageId);
+            }
         }
 
 

http://git-wip-us.apache.org/repos/asf/camel/blob/2728b2f1/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java
b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java
index 4e2c294..e0c5a25 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java
@@ -17,6 +17,7 @@
 package org.apache.camel.processor.idempotent;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.spi.ExchangeIdempotentRepository;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.ExchangeHelper;
@@ -67,9 +68,17 @@ public class IdempotentOnCompletion implements Synchronization {
     protected void onCompletedMessage(Exchange exchange, String messageId) {
         if (!eager) {
             // if not eager we should add the key when its complete
-            idempotentRepository.add(messageId);
+            if (idempotentRepository instanceof ExchangeIdempotentRepository) {
+                ((ExchangeIdempotentRepository<String>) idempotentRepository).add(exchange,
messageId);
+            } else {
+                idempotentRepository.add(messageId);
+            }
+        }
+        if (idempotentRepository instanceof ExchangeIdempotentRepository) {
+            ((ExchangeIdempotentRepository<String>) idempotentRepository).confirm(exchange,
messageId);
+        } else {
+            idempotentRepository.confirm(messageId);
         }
-        idempotentRepository.confirm(messageId);
     }
 
     /**
@@ -81,7 +90,11 @@ public class IdempotentOnCompletion implements Synchronization {
      */
     protected void onFailedMessage(Exchange exchange, String messageId) {
         if (removeOnFailure) {
-            idempotentRepository.remove(messageId);
+            if (idempotentRepository instanceof ExchangeIdempotentRepository) {
+                ((ExchangeIdempotentRepository<String>) idempotentRepository).remove(exchange,
messageId);
+            } else {
+                idempotentRepository.remove(messageId);
+            }
             LOG.debug("Removed from repository as exchange failed: {} with id: {}", exchange,
messageId);
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/2728b2f1/camel-core/src/main/java/org/apache/camel/spi/ExchangeIdempotentRepository.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/ExchangeIdempotentRepository.java
b/camel-core/src/main/java/org/apache/camel/spi/ExchangeIdempotentRepository.java
new file mode 100644
index 0000000..f3e49d2
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/spi/ExchangeIdempotentRepository.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Exchange;
+
+/**
+ * Access to a repository of Message IDs to implement the
+ * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent Consumer</a>
pattern.
+ * <p/>
+ * The <tt>add</tt> and <tt>contains</tt> methods is operating according
to the {@link java.util.Set} contract.
+ * <p/>
+ * The repository supports eager (default) and non-eager mode.
+ * <ul>
+ *     <li>eager: calls <tt>add</tt> and <tt>confirm</tt> if
complete, or <tt>remove</tt> if failed</li>
+ *     <li>non-eager: calls <tt>contains</tt> and <tt>add</tt>
if complete, or <tt>remove</tt> if failed</li>
+ * </ul>
+ * Notice the remove callback, can be configured to be disabled.
+ * <p/>
+ * This repository supports the operations to pass in the current exchange, which can be
needed by some implementations
+ * such as the JPA idempotent consumer.
+ *
+ * @version 
+ */
+public interface ExchangeIdempotentRepository<E> extends IdempotentRepository<E>
{
+
+    /**
+     * Adds the key to the repository.
+     * <p/>
+     * <b>Important:</b> Read the class javadoc about eager vs non-eager mode.
+     *
+     * @param key the key of the message for duplicate test
+     * @return <tt>true</tt> if this repository did <b>not</b> already
contain the specified element
+     */
+    boolean add(Exchange exchange, E key);
+
+    /**
+     * Returns <tt>true</tt> if this repository contains the specified element.
+     * <p/>
+     * <b>Important:</b> Read the class javadoc about eager vs non-eager mode.
+     *
+     * @param key the key of the message
+     * @return <tt>true</tt> if this repository contains the specified element
+     */
+    boolean contains(Exchange exchange, E key);
+
+    /**
+     * Removes the key from the repository.
+     * <p/>
+     * Is usually invoked if the exchange failed.
+     * <p/>
+     * <b>Important:</b> Read the class javadoc about eager vs non-eager mode.
+     *
+     * @param key the key of the message for duplicate test
+     * @return <tt>true</tt> if the key was removed
+     */
+    boolean remove(Exchange exchange, E key);
+
+    /**
+     * Confirms the key, after the exchange has been processed successfully.
+     * <p/>
+     * <b>Important:</b> Read the class javadoc about eager vs non-eager mode.
+     *
+     * @param key the key of the message for duplicate test
+     * @return <tt>true</tt> if the key was confirmed
+     */
+    boolean confirm(Exchange exchange, E key);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2728b2f1/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java b/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
index 29d6d1b..ff6611e 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
@@ -30,8 +30,12 @@ import org.apache.camel.Service;
  *     <li>non-eager: calls <tt>contains</tt> and <tt>add</tt>
if complete, or <tt>remove</tt> if failed</li>
  * </ul>
  * Notice the remove callback, can be configured to be disabled.
+ * <p/>
+ * Implementations for the <a href="http://camel.apache.org/idempotent-consumer.html">idempotent
consumer EIP</a>
+ * should favor using {@link org.apache.camel.spi.ExchangeIdempotentRepository} instead.
  *
- * @version 
+ * @version
+ * @see org.apache.camel.spi.ExchangeIdempotentRepository
  */
 public interface IdempotentRepository<E> extends Service {
 

http://git-wip-us.apache.org/repos/asf/camel/blob/2728b2f1/camel-core/src/test/java/org/apache/camel/processor/ExchangeIdempotentConsumerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ExchangeIdempotentConsumerTest.java
b/camel-core/src/test/java/org/apache/camel/processor/ExchangeIdempotentConsumerTest.java
new file mode 100644
index 0000000..294656b
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ExchangeIdempotentConsumerTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.spi.ExchangeIdempotentRepository;
+import org.apache.camel.spi.IdempotentRepository;
+
+/**
+ * @version 
+ */
+public class ExchangeIdempotentConsumerTest extends ContextTestSupport {
+    protected Endpoint startEndpoint;
+    protected MockEndpoint resultEndpoint;
+
+    private MyIdempotentRepo repo = new MyIdempotentRepo();
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public void testDuplicateMessagesAreFilteredOut() throws Exception {
+        assertEquals(0, repo.getExchange().size());
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").idempotentConsumer(
+                        header("messageId"), repo
+                ).to("mock:result");
+            }
+        });
+        context.start();
+
+        resultEndpoint.expectedBodiesReceived("one", "two", "three");
+
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("3", "three");
+
+        assertMockEndpointsSatisfied();
+
+        // we used 6 different exchanges
+        assertEquals(6, repo.getExchange().size());
+
+        for (Exchange exchange : resultEndpoint.getExchanges()) {
+            // should be in repo list
+            assertTrue("Should contain the exchange", repo.getExchange().contains(exchange.getExchangeId()));
+        }
+    }
+
+    protected void sendMessage(final Object messageId, final Object body) {
+        template.send(startEndpoint, new Processor() {
+            public void process(Exchange exchange) {
+                // now lets fire in a message
+                Message in = exchange.getIn();
+                in.setBody(body);
+                in.setHeader("messageId", messageId);
+            }
+        });
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        startEndpoint = resolveMandatoryEndpoint("direct:start");
+        resultEndpoint = getMockEndpoint("mock:result");
+    }
+
+    private class MyIdempotentRepo implements ExchangeIdempotentRepository<String>
{
+
+        private IdempotentRepository<String> delegate;
+        private Set<String> exchanges = new LinkedHashSet<String>();
+
+        private MyIdempotentRepo() {
+            delegate = MemoryIdempotentRepository.memoryIdempotentRepository(200);
+        }
+
+        @Override
+        public boolean add(Exchange exchange, String key) {
+            exchanges.add(exchange.getExchangeId());
+            return delegate.add(key);
+        }
+
+        @Override
+        public boolean contains(Exchange exchange, String key) {
+            exchanges.add(exchange.getExchangeId());
+            return delegate.contains(key);
+        }
+
+        @Override
+        public boolean remove(Exchange exchange, String key) {
+            exchanges.add(exchange.getExchangeId());
+            return delegate.remove(key);
+        }
+
+        @Override
+        public boolean confirm(Exchange exchange, String key) {
+            exchanges.add(exchange.getExchangeId());
+            return delegate.confirm(key);
+        }
+
+        @Override
+        public boolean add(String key) {
+            throw new UnsupportedOperationException("Should not be called");
+        }
+
+        @Override
+        public boolean contains(String key) {
+            throw new UnsupportedOperationException("Should not be called");
+        }
+
+        @Override
+        public boolean remove(String key) {
+            throw new UnsupportedOperationException("Should not be called");
+        }
+
+        @Override
+        public boolean confirm(String key) {
+            throw new UnsupportedOperationException("Should not be called");
+        }
+
+        public Set<String> getExchange() {
+            return exchanges;
+        }
+
+        @Override
+        public void start() throws Exception {
+            // noop
+        }
+
+        @Override
+        public void stop() throws Exception {
+            // noop
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2728b2f1/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaCloseEntityManagerOnCompletion.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaCloseEntityManagerOnCompletion.java
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaCloseEntityManagerOnCompletion.java
new file mode 100644
index 0000000..c508eca
--- /dev/null
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaCloseEntityManagerOnCompletion.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jpa;
+
+import javax.persistence.EntityManager;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Ordered;
+import org.apache.camel.support.SynchronizationAdapter;
+
+public class JpaCloseEntityManagerOnCompletion extends SynchronizationAdapter {
+
+    private final EntityManager entityManager;
+
+    public JpaCloseEntityManagerOnCompletion(EntityManager entityManager) {
+        this.entityManager = entityManager;
+    }
+
+    @Override
+    public void onDone(Exchange exchange) {
+        entityManager.close();
+    }
+
+    @Override
+    public int getOrder() {
+        // we want to run as last as possible
+        return Ordered.LOWEST;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2728b2f1/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java
index 3e42d9f..4466b3f 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java
@@ -23,7 +23,13 @@ package org.apache.camel.component.jpa;
  */
 public final class JpaConstants {
 
-    public static final String ENTITYMANAGER = "CamelEntityManager";
+    public static final String ENTITY_MANAGER = "CamelEntityManager";
+
+    /**
+     * @deprecated use {@link #ENTITY_MANAGER}
+     */
+    @Deprecated
+    public static final String ENTITYMANAGER = ENTITY_MANAGER;
 
     private JpaConstants() {
         // utility class

http://git-wip-us.apache.org/repos/asf/camel/blob/2728b2f1/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
index 483c311..95c9d26 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
@@ -23,9 +23,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
-
 import javax.persistence.Entity;
 import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
 import javax.persistence.LockModeType;
 import javax.persistence.OptimisticLockException;
 import javax.persistence.PersistenceException;
@@ -49,8 +49,9 @@ import org.springframework.transaction.support.TransactionTemplate;
 public class JpaConsumer extends ScheduledBatchPollingConsumer {
     private static final Logger LOG = LoggerFactory.getLogger(JpaConsumer.class);
     private static final Map<String, Object> NOWAIT;
-    private final EntityManager entityManager;
+    private final EntityManagerFactory entityManagerFactory;
     private final TransactionTemplate transactionTemplate;
+    private EntityManager entityManager;
     private QueryFactory queryFactory;
     private DeleteHandler<Object> deleteHandler;
     private DeleteHandler<Object> preDeleteHandler;
@@ -78,7 +79,7 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
 
     public JpaConsumer(JpaEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
-        this.entityManager = endpoint.createEntityManager();
+        this.entityManagerFactory = endpoint.getEntityManagerFactory();
         this.transactionTemplate = endpoint.createTransactionTemplate();
     }
 
@@ -477,10 +478,21 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
     }
 
     @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        this.entityManager = entityManagerFactory.createEntityManager();
+        LOG.trace("Created EntityManager {} on {}", entityManager, this);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // noop
+    }
+
+    @Override
     protected void doShutdown() throws Exception {
         super.doShutdown();
-        entityManager.close();
-        LOG.trace("closed the EntityManager {} on {}", entityManager, this);
+        this.entityManager.close();
+        LOG.trace("Closed EntityManager {} on {}", entityManager, this);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/2728b2f1/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
index 09e549d..44fed92 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
@@ -17,7 +17,6 @@
 package org.apache.camel.component.jpa;
 
 import java.util.Map;
-
 import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
 
@@ -72,8 +71,6 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
     }
 
     /**
-     * 
-     * @param endpointUri
      * @deprecated use {@link JpaEndpoint#JpaEndpoint(String, JpaComponent)} instead
      */
     @Deprecated
@@ -88,9 +85,6 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
     }
 
     /**
-     * 
-     * @param endpointUri
-     * @param entityManagerFactory
      * @deprecated use {@link JpaEndpoint#JpaEndpoint(String, JpaComponent)} instead
      */
     @Deprecated
@@ -100,10 +94,6 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
     }
 
     /**
-     * 
-     * @param endpointUri
-     * @param entityManagerFactory
-     * @param transactionManager
      * @deprecated use {@link JpaEndpoint#JpaEndpoint(String, JpaComponent)} instead
      */
     @Deprecated
@@ -292,6 +282,10 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
         return tm;
     }
 
+    /**
+     * @deprecated use {@link #getEntityManagerFactory()} to get hold of factory and create
an entity manager using the factory.
+     */
+    @Deprecated
     protected EntityManager createEntityManager() {
         return getEntityManagerFactory().createEntityManager();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/2728b2f1/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
new file mode 100644
index 0000000..81df51b
--- /dev/null
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jpa;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+
+import org.apache.camel.Exchange;
+
+/**
+ * Helper for JPA.
+ */
+public final class JpaHelper {
+
+    private JpaHelper() {
+    }
+
+    /**
+     * Gets or creates an {@link javax.persistence.EntityManager} to use.
+     *
+     * @param exchange                 the current exchange, or <tt>null</tt>
if no exchange
+     * @param entityManagerFactory     the entity manager factory (mandatory)
+     * @param usePassedInEntityManager whether to use an existing {@link javax.persistence.EntityManager}
which has been stored
+     *                                 on the exchange in the header with key {@link org.apache.camel.component.jpa.JpaConstants#ENTITY_MANAGER}
+     * @return the entity manager (is never null)
+     */
+    public static EntityManager getTargetEntityManager(Exchange exchange, EntityManagerFactory
entityManagerFactory,
+                                                       boolean usePassedInEntityManager)
{
+        EntityManager em = null;
+
+        // favor using entity manager provided as a header from the end user
+        if (exchange != null && usePassedInEntityManager) {
+            em = exchange.getIn().getHeader(JpaConstants.ENTITY_MANAGER, EntityManager.class);
+        }
+
+        // then try reuse any entity manager which has been previously created and stored
on the exchange
+        if (em == null && exchange != null) {
+            em = exchange.getProperty(JpaConstants.ENTITY_MANAGER, EntityManager.class);
+        }
+
+        if (em == null) {
+            // create a new entity manager
+            em = entityManagerFactory.createEntityManager();
+            if (exchange != null) {
+                // we want to reuse the EM so store as property and make sure we close it
when done with the exchange
+                exchange.setProperty(JpaConstants.ENTITY_MANAGER, em);
+                exchange.addOnCompletion(new JpaCloseEntityManagerOnCompletion(em));
+            }
+        }
+
+        return em;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2728b2f1/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
index bd43978..1565d4a 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
@@ -17,8 +17,8 @@
 package org.apache.camel.component.jpa;
 
 import java.util.Collection;
-
 import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
@@ -29,19 +29,21 @@ import org.springframework.transaction.TransactionStatus;
 import org.springframework.transaction.support.TransactionCallback;
 import org.springframework.transaction.support.TransactionTemplate;
 
+import static org.apache.camel.component.jpa.JpaHelper.getTargetEntityManager;
+
 /**
  * @version 
  */
 public class JpaProducer extends DefaultProducer {
     private static final Logger LOG = LoggerFactory.getLogger(JpaProducer.class);
-    private final EntityManager entityManager;
+    private final EntityManagerFactory entityManagerFactory;
     private final TransactionTemplate transactionTemplate;
     private final Expression expression;
 
     public JpaProducer(JpaEndpoint endpoint, Expression expression) {
         super(endpoint);
         this.expression = expression;
-        this.entityManager = endpoint.createEntityManager();
+        this.entityManagerFactory = endpoint.getEntityManagerFactory();
         this.transactionTemplate = endpoint.createTransactionTemplate();
     }
 
@@ -51,16 +53,17 @@ public class JpaProducer extends DefaultProducer {
     }
 
     public void process(final Exchange exchange) {
-        final EntityManager targetEntityManager = getTargetEntityManager(exchange);
-        exchange.getIn().setHeader(JpaConstants.ENTITYMANAGER, targetEntityManager);
-
         final Object values = expression.evaluate(exchange, Object.class);
+
         if (values != null) {
+            final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory,
getEndpoint().isUsePassedInEntityManager());
+
             transactionTemplate.execute(new TransactionCallback<Object>() {
                 public Object doInTransaction(TransactionStatus status) {
                     if (getEndpoint().isJoinTransaction()) {
-                        targetEntityManager.joinTransaction();
+                        entityManager.joinTransaction();
                     }
+
                     if (values.getClass().isArray()) {
                         Object[] array = (Object[])values;
                         for (Object element : array) {
@@ -79,50 +82,28 @@ public class JpaProducer extends DefaultProducer {
                     }
 
                     if (getEndpoint().isFlushOnSend()) {
-                        // there may be concurrency so need to join tx before flush
-                        if (getEndpoint().isJoinTransaction()) {
-                            targetEntityManager.joinTransaction();
-                        }
-                        targetEntityManager.flush();
+                        entityManager.flush();
                     }
 
                     return null;
                 }
 
                 /**
-                 * save the given entity end return the managed entity
+                 * Save the given entity end return the managed entity
+                 *
                  * @return the managed entity
                  */
                 private Object save(final Object entity) {
-                    // there may be concurrency so need to join tx before persist/merge
-                    targetEntityManager.joinTransaction();
+                    LOG.debug("save: {}", entity);
                     if (getEndpoint().isUsePersist()) {
-                        targetEntityManager.persist(entity);
+                        entityManager.persist(entity);
                         return entity;
                     } else {
-                        return targetEntityManager.merge(entity);
+                        return entityManager.merge(entity);
                     }
                 }
             });
         }
     }
 
-    private EntityManager getTargetEntityManager(Exchange exchange) {
-        EntityManager em = this.entityManager;
-        if (getEndpoint().isUsePassedInEntityManager()) {
-            EntityManager passedIn = exchange.getIn().getHeader(JpaConstants.ENTITYMANAGER,
EntityManager.class);
-            if (passedIn != null) {
-                em = passedIn;
-            }
-        }
-        return em;
-    }
-
-    @Override
-    protected void doShutdown() throws Exception {
-        super.doShutdown();
-        entityManager.close();
-        LOG.trace("closed the EntityManager {} on {}", entityManager, this);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/2728b2f1/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
b/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
index e0989f2..e352cff 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
@@ -18,31 +18,36 @@ package org.apache.camel.processor.idempotent.jpa;
 
 import java.util.Date;
 import java.util.List;
-
 import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
 import javax.persistence.Persistence;
 import javax.persistence.Query;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedOperation;
 import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.spi.ExchangeIdempotentRepository;
 import org.apache.camel.support.ServiceSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.orm.jpa.JpaTransactionManager;
 import org.springframework.transaction.TransactionDefinition;
 import org.springframework.transaction.TransactionStatus;
 import org.springframework.transaction.support.TransactionCallback;
 import org.springframework.transaction.support.TransactionTemplate;
 
+import static org.apache.camel.component.jpa.JpaHelper.getTargetEntityManager;
+
 /**
- * @version 
+ * @version
  */
 @ManagedResource(description = "JPA based message id repository")
-public class JpaMessageIdRepository extends ServiceSupport implements IdempotentRepository<String>
{
+public class JpaMessageIdRepository extends ServiceSupport implements ExchangeIdempotentRepository<String>
{
     protected static final String QUERY_STRING = "select x from " + MessageProcessed.class.getName()
+ " x where x.processorName = ?1 and x.messageId = ?2";
+    private static final Logger LOG = LoggerFactory.getLogger(JpaMessageIdRepository.class);
     private final String processorName;
-    private final EntityManager entityManager;
+    private final EntityManagerFactory entityManagerFactory;
     private final TransactionTemplate transactionTemplate;
     private boolean joinTransaction = true;
 
@@ -51,7 +56,7 @@ public class JpaMessageIdRepository extends ServiceSupport implements Idempotent
     }
 
     public JpaMessageIdRepository(EntityManagerFactory entityManagerFactory, TransactionTemplate
transactionTemplate, String processorName) {
-        this.entityManager = entityManagerFactory.createEntityManager();
+        this.entityManagerFactory = entityManagerFactory;
         this.processorName = processorName;
         this.transactionTemplate = transactionTemplate;
     }
@@ -72,15 +77,22 @@ public class JpaMessageIdRepository extends ServiceSupport implements
Idempotent
     }
 
     @ManagedOperation(description = "Adds the key to the store")
-    public boolean add(final String messageId) {
+    public boolean add(String messageId) {
+        return add(null, messageId);
+    }
+
+    @Override
+    public boolean add(final Exchange exchange, final String messageId) {
+        final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory,
true);
+
         // Run this in single transaction.
         Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>()
{
-            public Boolean doInTransaction(TransactionStatus arg0) {
+            public Boolean doInTransaction(TransactionStatus status) {
                 if (isJoinTransaction()) {
                     entityManager.joinTransaction();
                 }
 
-                List<?> list = query(messageId);
+                List<?> list = query(entityManager, messageId);
                 if (list.isEmpty()) {
                     MessageProcessed processed = new MessageProcessed();
                     processed.setProcessorName(processorName);
@@ -94,19 +106,28 @@ public class JpaMessageIdRepository extends ServiceSupport implements
Idempotent
                 }
             }
         });
-        return rc.booleanValue();
+
+        LOG.debug("add {} -> {}", messageId, rc);
+        return rc;
     }
 
     @ManagedOperation(description = "Does the store contain the given key")
-    public boolean contains(final String messageId) {
+    public boolean contains(String messageId) {
+        return contains(null, messageId);
+    }
+
+    @Override
+    public boolean contains(final Exchange exchange, final String messageId) {
+        final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory,
true);
+
         // Run this in single transaction.
         Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>()
{
-            public Boolean doInTransaction(TransactionStatus arg0) {
+            public Boolean doInTransaction(TransactionStatus status) {
                 if (isJoinTransaction()) {
                     entityManager.joinTransaction();
                 }
 
-                List<?> list = query(messageId);
+                List<?> list = query(entityManager, messageId);
                 if (list.isEmpty()) {
                     return Boolean.FALSE;
                 } else {
@@ -114,48 +135,66 @@ public class JpaMessageIdRepository extends ServiceSupport implements
Idempotent
                 }
             }
         });
-        return rc.booleanValue();
+
+        LOG.debug("contains {} -> {}", messageId, rc);
+        return rc;
     }
 
     @ManagedOperation(description = "Remove the key from the store")
-    public boolean remove(final String messageId) {
+    public boolean remove(String messageId) {
+        return remove(null, messageId);
+    }
+
+    @Override
+    public boolean remove(final Exchange exchange, final String messageId) {
+        final EntityManager entityManager = getTargetEntityManager(exchange, entityManagerFactory,
true);
+
         Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>()
{
-            public Boolean doInTransaction(TransactionStatus arg0) {
+            public Boolean doInTransaction(TransactionStatus status) {
                 if (isJoinTransaction()) {
                     entityManager.joinTransaction();
                 }
 
-                List<?> list = query(messageId);
+                List<?> list = query(entityManager, messageId);
                 if (list.isEmpty()) {
                     return Boolean.FALSE;
                 } else {
-                    MessageProcessed processed = (MessageProcessed)list.get(0);
+                    MessageProcessed processed = (MessageProcessed) list.get(0);
                     entityManager.remove(processed);
                     entityManager.flush();
                     return Boolean.TRUE;
                 }
             }
         });
-        return rc.booleanValue();
+
+        LOG.debug("remove {}", messageId);
+        return rc;
+    }
+
+    @Override
+    public boolean confirm(String messageId) {
+        return confirm(null, messageId);
+    }
+
+    @Override
+    public boolean confirm(final Exchange exchange, String messageId) {
+        LOG.debug("confirm {} -> true", messageId);
+        return true;
     }
-    
-    private List<?> query(final String messageId) {
+
+    private List<?> query(final EntityManager entityManager, final String messageId)
{
         Query query = entityManager.createQuery(QUERY_STRING);
         query.setParameter(1, processorName);
         query.setParameter(2, messageId);
         return query.getResultList();
     }
 
-    public boolean confirm(String s) {
-        // noop
-        return true;
-    }
-
     @ManagedAttribute(description = "The processor name")
     public String getProcessorName() {
         return processorName;
     }
 
+    @ManagedAttribute(description = "Whether to join existing transaction")
     public boolean isJoinTransaction() {
         return joinTransaction;
     }
@@ -166,10 +205,11 @@ public class JpaMessageIdRepository extends ServiceSupport implements
Idempotent
 
     @Override
     protected void doStart() throws Exception {
+        // noop
     }
 
     @Override
     protected void doStop() throws Exception {
-        entityManager.close();
+        // noop
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/2728b2f1/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java
b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java
index bc0aca4..cf95e519 100644
--- a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java
+++ b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java
@@ -148,7 +148,7 @@ public abstract class AbstractJpaMethodTest extends CamelTestSupport {
         endpoint = context.getEndpoint(endpointUri, JpaEndpoint.class);
 
         transactionTemplate = endpoint.createTransactionTemplate();
-        entityManager = endpoint.createEntityManager();
+        entityManager = endpoint.getEntityManagerFactory().createEntityManager();
         
         transactionTemplate.execute(new TransactionCallback<Object>() {
             public Object doInTransaction(TransactionStatus status) {

http://git-wip-us.apache.org/repos/asf/camel/blob/2728b2f1/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/fileConsumerJpaIdempotentTest-config.xml
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/fileConsumerJpaIdempotentTest-config.xml
b/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/fileConsumerJpaIdempotentTest-config.xml
index 92b8021..a3ab5ec 100644
--- a/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/fileConsumerJpaIdempotentTest-config.xml
+++ b/components/camel-jpa/src/test/resources/org/apache/camel/processor/jpa/fileConsumerJpaIdempotentTest-config.xml
@@ -26,7 +26,7 @@
     <bean id="jpaStore" class="org.apache.camel.processor.idempotent.jpa.JpaMessageIdRepository">
         <!-- Here we refer to the entityManagerFactory -->
         <constructor-arg index="0" ref="entityManagerFactory"/>
-        <!-- This 2nd parameter is the name  (= a cateogry name).
+        <!-- This 2nd parameter is the name  (= a category name).
              You can have different repositories with different names -->
         <constructor-arg index="1" value="FileConsumer"/>
     </bean>


Mime
View raw message