camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r673008 - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/test/java/org/apache/camel/ components/camel-...
Date Tue, 01 Jul 2008 06:32:48 GMT
Author: davsclaus
Date: Mon Jun 30 23:32:47 2008
New Revision: 673008

URL: http://svn.apache.org/viewvc?rev=673008&view=rev
Log:
CAMEL-634: Applied patch with thanks to Marat. Added unit test for transactional DataSource.

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeProperty.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ExchangePropertyTest.java
    activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/BookService.java
    activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTest.java
    activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/transactionalClientDataSource.xml
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
    activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java
    activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java
    activemq/camel/trunk/components/camel-spring/pom.xml
    activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=673008&r1=673007&r2=673008&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Mon Jun 30
23:32:47 2008
@@ -170,6 +170,11 @@
     boolean isFailed();
 
     /**
+     * Returns true if this exchange is transacted
+     */
+    boolean isTransacted();
+    
+    /**
      * Returns the container so that a processor can resolve endpoints from URIs
      *
      * @return the container which owns this exchange

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeProperty.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeProperty.java?rev=673008&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeProperty.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeProperty.java Mon
Jun 30 23:32:47 2008
@@ -0,0 +1,135 @@
+package org.apache.camel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents an instance and a type safe registry of well known Camel Exchange properties.
+ * <p/>
+ * <b>Usage pattern:</b>
+ * <br/>In your code register a property that you wish to pass via Camel Exchange:
+ * <pre>
+ *      public static final ExchangeProperty<Boolean> myProperty =
+ *            new ExchangeProperty<Boolean>("myProperty", "org.apache.myproject.mypackage.myproperty",
Boolean.class);
+ *
+ *  Then in your code set this property's value:
+ *      myProperty.set(exchange, Boolean.TRUE);
+ *
+ *  Check the value of this property where required:
+ *      ExchangeProperty<?> property = ExchangeProperty.get("myProperty");
+ *      if (property != null && property.get(exchange) == Boolean.TRUE) {
+ *           // do your thing ...
+ *       }
+ *  Or
+ *      Boolean value = myProperty.get(exchange);
+ *      if (value == Boolean.TRUE) {
+ *          // do your thing
+ *      }
+ *
+ *  When your code no longer requires this property then deregister it:
+ *      ExchangeProperty.deregister(myProperty);
+ *  Or
+ *      ExchangeProperty.deregister("myProperty");
+ *  </pre>
+ *
+ *  <b>Note:</b> that if ExchangeProperty instance get or set methods are used
then type checks
+ *  of property's value are performed and a runtime exception can be thrown if type 
+ *  safety is violated.
+ */
+public class ExchangeProperty<T> {
+    private final String literal;
+    private final String name;
+    private final Class<T> type;
+
+    private static final List<ExchangeProperty<?>> values = 
+        new ArrayList<ExchangeProperty<?>>();
+
+    private static final Map<String, ExchangeProperty<?>> literalMap = 
+        new HashMap<String, ExchangeProperty<?>>();
+    
+    private static final Map<String, ExchangeProperty<?>> nameMap = 
+        new HashMap<String, ExchangeProperty<?>>();
+
+    public ExchangeProperty(String literal, String name, Class<T> type) {
+        this.literal = literal;
+        this.name = name;
+        this.type = type;
+        register(this);
+    }
+
+    public String literal() {
+        return literal;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public Class<T> type() {
+        return type;
+    }
+
+    public T get(Exchange exchange) {
+        return exchange.getProperty(name, type);
+    }
+
+    public static ExchangeProperty<?> get(String literal) {
+        return literalMap.get(literal);
+    }
+
+    public static ExchangeProperty<?> getByName(String name) {
+        return nameMap.get(name);
+    }
+
+    public T set(Exchange exchange, T value) {
+        T oldValue = get(exchange);
+        exchange.setProperty(name, value);
+        return oldValue;
+    }
+
+    public T remove(Exchange exchange) {
+        T oldValue = get(exchange);
+        exchange.removeProperty(name);
+        return oldValue;
+    }
+
+    @Override
+    public String toString() {
+        return type().getCanonicalName() + " " + name + " (" + literal() + ")";
+    }
+
+    public static synchronized void register(ExchangeProperty<?> property) {
+        ExchangeProperty<?> existingProperty = literalMap.get(property.literal());
+        if (existingProperty != null && existingProperty != property) {
+            throw new RuntimeCamelException("An Exchange Property '" + property.literal()

+                    + "' has already been registered; its traits are: " + existingProperty.toString());
+        }
+        values.add(property);
+        literalMap.put(property.literal(), property);
+        nameMap.put(property.name(), property);
+    }
+
+    public static synchronized void deregister(ExchangeProperty<?> property) {
+        if (property != null) {
+            values.remove(property);
+            literalMap.remove(property.literal());
+            nameMap.put(property.name(), property);
+        }
+    }
+
+    public static synchronized void deregister(String literal) {
+        ExchangeProperty<?> property = literalMap.get(literal);
+        if (property != null) {
+            values.remove(property);
+            literalMap.remove(property.literal());
+            nameMap.put(property.name(), property);
+        }
+    }
+
+    public static synchronized ExchangeProperty<?>[] values() {
+        return values.toArray(new ExchangeProperty[0]);
+    }
+
+}
\ No newline at end of file

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java?rev=673008&r1=673007&r2=673008&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
Mon Jun 30 23:32:47 2008
@@ -22,6 +22,7 @@
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExchangeProperty;
 import org.apache.camel.Message;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.UnitOfWork;
@@ -33,6 +34,7 @@
  * @version $Revision$
  */
 public class DefaultExchange implements Exchange {
+
     private static final UuidGenerator DEFAULT_ID_GENERATOR = new UuidGenerator();
     protected final CamelContext context;
     private Map<String, Object> properties;
@@ -133,13 +135,39 @@
 
     public <T> T getProperty(String name, Class<T> type) {
         Object value = getProperty(name);
+
+        // if the property is also a well known property in ExchangeProperty then validate
that the
+        // value is of the same type
+        ExchangeProperty<?> property = ExchangeProperty.getByName(name);
+        if (property != null) {
+            validateExchangePropertyIsExpectedType(property, type, value);
+        }
+
         return getContext().getTypeConverter().convertTo(type, value);
     }
 
     public void setProperty(String name, Object value) {
+        ExchangeProperty<?> property = ExchangeProperty.getByName(name);
+
+        // if the property is also a well known property in ExchangeProperty then validate
that the
+        // value is of the same type
+        if (property != null) {
+            Class type = value.getClass();
+            validateExchangePropertyIsExpectedType(property, type, value);
+        }
+
         getProperties().put(name, value);
     }
 
+    private <T> void validateExchangePropertyIsExpectedType(ExchangeProperty<?>
property, Class<T> type, Object value) {
+        if (value != null && property != null && !property.type().isAssignableFrom(type))
{
+            throw new RuntimeCamelException("Type cast exception while getting an "
+                    + "Exchange Property value '" + value.toString()
+                    + "' on Exchange " + this
+                    + " for a well known Exchange Property with these traits: " + property);
+        }
+    }
+
     public Object removeProperty(String name) {
         return getProperties().remove(name);
     }
@@ -253,6 +281,11 @@
         return getException() != null;
     }
 
+    public boolean isTransacted() {
+        ExchangeProperty<?> property = ExchangeProperty.get("transacted");
+        return property != null && property.get(this) == Boolean.TRUE;
+    }
+
     public UnitOfWork getUnitOfWork() {
         return unitOfWork;
     }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=673008&r1=673007&r2=673008&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
Mon Jun 30 23:32:47 2008
@@ -21,6 +21,7 @@
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
@@ -93,7 +94,6 @@
     public boolean process(final Exchange exchange, final AsyncCallback callback, final RedeliveryData
data) {
 
         while (true) {
-
             // We can't keep retrying if the route is being shutdown.
             if (!isRunAllowed()) {
                 if (exchange.getException() == null) {
@@ -103,6 +103,8 @@
                 return data.sync;
             }
 
+            resetMaxDeliveryIfTransacted(exchange);
+
             if (exchange.getException() != null) {
                 Throwable e = exchange.getException();
                 exchange.setException(null); // Reset it since we are handling it.
@@ -143,6 +145,7 @@
 
             exchange.setProperty(EXCEPTION_CAUSE_PROPERTY, exchange.getException());
             exchange.setException(null);
+            
             boolean sync = outputAsync.process(exchange, new AsyncCallback() {
                 public void done(boolean sync) {
                     // Only handle the async case...
@@ -171,6 +174,12 @@
 
     }
 
+    public void resetMaxDeliveryIfTransacted(Exchange exchange) {
+        if (exchange.isTransacted()) {
+            redeliveryPolicy.setMaximumRedeliveries(1);
+        }
+    }
+
     public static boolean isFailureHandled(Exchange exchange) {
         return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null;
     }

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ExchangePropertyTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ExchangePropertyTest.java?rev=673008&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ExchangePropertyTest.java
(added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ExchangePropertyTest.java
Mon Jun 30 23:32:47 2008
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import org.apache.camel.impl.DefaultExchange;
+
+public class ExchangePropertyTest extends ExchangeTestSupport {
+    protected static final String P1_NAME = "org.apache.myproject.mypackage.myproperty1";
+    protected static final String P2_NAME = "org.apache.myproject.mypackage.myproperty2";
+    
+    protected Exchange exchange;
+
+    public void testExchangePropertyRegistry() throws Exception {
+        ExchangeProperty<Boolean> myProperty1 = 
+            new ExchangeProperty<Boolean>("myProperty1", P1_NAME, Boolean.class);
+        
+        assertEquals(ExchangeProperty.get("myProperty1"), myProperty1);
+        assertEquals(ExchangeProperty.values().length, 1); 
+        assertEquals(ExchangeProperty.values()[0], myProperty1);
+
+        ExchangeProperty<Boolean> myProperty2 = 
+            new ExchangeProperty<Boolean>("myProperty2", P2_NAME, Boolean.class);
+        
+        assertEquals(ExchangeProperty.get("myProperty2"), myProperty2);
+        assertEquals(ExchangeProperty.values().length, 2); 
+        assertEquals(ExchangeProperty.values()[1], myProperty2);
+
+        try {
+            ExchangeProperty<Boolean> rejectedProperty = 
+                new ExchangeProperty<Boolean>("myProperty2", P2_NAME, Boolean.class);
+            fail("Expected RuntimeCamelException to be thrown due to duplicate property "
+                 + " registration attempt");
+        } catch (RuntimeCamelException e) {
+            assertEquals(ExchangeProperty.values().length, 2);
+        } catch (Throwable t) {
+            fail("Expected RuntimeCamelException to be thrown due to duplicate propery "
+                    + " registration attempt");
+        }
+        ExchangeProperty.deregister(myProperty1);
+        assertEquals(ExchangeProperty.get("myProperty1"), null);
+        ExchangeProperty.deregister("myProperty2");
+        assertEquals(ExchangeProperty.get("myProperty2"), null);
+        assertEquals(ExchangeProperty.values().length, 0);
+    }
+    
+    public void testExchangePropertySetterGetter() throws Exception {
+        Exchange exchange = createExchange();
+
+        ExchangeProperty<Boolean> myProperty1 = 
+            new ExchangeProperty<Boolean>("myProperty1", P1_NAME, Boolean.class);
+
+        ExchangeProperty<String> myProperty2 = 
+            new ExchangeProperty<String>("myProperty2", P2_NAME, String.class);
+        
+        myProperty1.set(exchange, Boolean.TRUE);
+        assertTrue("Unexpected property value", 
+                    myProperty1.get(exchange) == Boolean.TRUE);
+        assertTrue("Unexpected property value", 
+                    ExchangeProperty.get("myProperty1").get(exchange) == Boolean.TRUE);
+        
+        myProperty2.set(exchange, "camel");
+        assertTrue("Unexpected property value", 
+                    myProperty2.get(exchange).equals("camel"));
+        assertTrue("Unexpected property value", 
+                    ExchangeProperty.get("myProperty2").get(exchange).equals("camel"));
+        
+        ExchangeProperty.deregister(myProperty1);
+        assertEquals(ExchangeProperty.get("myProperty1"), null);
+        ExchangeProperty.deregister("myProperty2");
+        assertEquals(ExchangeProperty.get("myProperty2"), null);
+        assertEquals(ExchangeProperty.values().length, 0);
+    }
+
+    public void testExchangePropertyTypeSafety() throws Exception {
+        Exchange exchange = createExchange();
+        ExchangeProperty<Boolean> myProperty1 = 
+            new ExchangeProperty<Boolean>("myProperty1", P1_NAME, Boolean.class);
+        try {
+            exchange.setProperty(P1_NAME, "camel");
+            fail("Expected RuntimeCamelException to be thrown due to property value type
cast violation");
+        } catch (RuntimeCamelException e) {
+            // complete
+        } catch (Throwable t) {
+            fail("Expected RuntimeCamelException to be thrown due to property value type
cast violation");
+        }
+        
+        myProperty1.set(exchange, Boolean.TRUE);
+        
+        assertTrue("Unexpected property value", 
+                myProperty1.get(exchange) == Boolean.TRUE);
+        assertTrue("Unexpected property value", 
+                ExchangeProperty.get("myProperty1").get(exchange) == Boolean.TRUE);
+
+        ExchangeProperty.deregister(myProperty1);
+        assertEquals(ExchangeProperty.get("myProperty1"), null);
+        assertEquals(ExchangeProperty.values().length, 0);
+    }
+}

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=673008&r1=673007&r2=673008&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
(original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
Mon Jun 30 23:32:47 2008
@@ -55,9 +55,10 @@
     }
 
     public void onMessage(final Message message) {
+        RuntimeCamelException rce = null;
         try {
             if (LOG.isDebugEnabled()) {
-                LOG.debug(endpoint + " receiving JMS message: " + message);
+                LOG.debug(endpoint + " consumer receiving JMS message: " + message);
             }
             Destination replyDestination = getReplyToDestination(message);
             final JmsExchange exchange = createExchange(message, replyDestination);
@@ -65,13 +66,20 @@
                 exchange.getIn().getHeaders();
             }
             processor.process(exchange);
-
             final JmsMessage out = exchange.getOut(false);
-            if (out != null && !disableReplyTo) {
+            if (exchange.getException() != null) {
+                rce = new RuntimeCamelException(exchange.getException());
+            }
+            if (rce == null && out != null && !disableReplyTo) {
                 sendReply(replyDestination, message, exchange, out);
             }
         } catch (Exception e) {
-            throw new RuntimeCamelException(e);
+            rce = new RuntimeCamelException(e);
+        }
+        if (rce != null) {
+            LOG.warn(endpoint + " consumer caught an exception while processing "
+                     + "JMS message: " + message, rce);
+            throw rce;
         }
     }
 

Modified: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java?rev=673008&r1=673007&r2=673008&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java
(original)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java
Mon Jun 30 23:32:47 2008
@@ -30,23 +30,31 @@
 
     private static final transient Log LOG = LogFactory.getLog(ConditionalExceptionProcessor.class);
     private int count;
+    private int maxCalls;
+    private String errorMsg;
 
     public ConditionalExceptionProcessor() {
+        this.maxCalls = 1;
+    }
+
+    public ConditionalExceptionProcessor(int maxCalls) {
+        this.maxCalls = maxCalls;
     }
 
     public void process(Exchange exchange) throws Exception {
         setCount(getCount() + 1);
 
-        AbstractTransactionTest.assertTrue("Expected only 2 calls to process() but encountered
"
-            + getCount() + ". There should be 1 for intentionally triggered rollback, and
1 for redelivery.",
-            getCount() <= 2);
-
+        if (getCount() > maxCalls * 2) {
+            errorMsg = "Expected only " + maxCalls * 2 + " calls to process() but encountered
"
+            + getCount() + ". There should be 1 for intentionally triggered rollback, and
1 for redelivery for each call.";
+        }
+         
         // should be printed 2 times due to one re-delivery after one failure
-        LOG.info("Exchange[" + getCount() + "][" + ((getCount() <= 1) ? "Should rollback"
: "Should succeed")
+        LOG.info("Exchange[" + getCount() + "][" + ((getCount() % 2 != 0) ? "Should rollback"
: "Should succeed")
             + "] = " + exchange);
 
-        // force rollback on the second attempt
-        if (getCount() <= 1) {
+        // force rollback on every mod 2 attempt
+        if (getCount() % 2 != 0) {
             throw new Exception("Rollback should be intentionally triggered: count[" + getCount()
+ "].");
         }
     }
@@ -58,4 +66,8 @@
     public int getCount() {
         return count;
     }
+    
+    public String getErrorMessage() {
+        return errorMsg;
+    }
 }

Modified: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java?rev=673008&r1=673007&r2=673008&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java
(original)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java
Mon Jun 30 23:32:47 2008
@@ -44,14 +44,13 @@
     public void testRollbackUsingXmlQueueToQueueRequestReplyUsingDynamicMessageSelector()
throws Exception {
 
         JmsComponent c = (JmsComponent)context.getComponent("activemq");
-        // c.getConfiguration().setRequestTimeout(600000);
         JmsComponent c1 = (JmsComponent)context.getComponent("activemq-1");
-        
+        final ConditionalExceptionProcessor cp = new ConditionalExceptionProcessor(10);
         context.addRoutes(new SpringRouteBuilder() {
             @Override
             public void configure() throws Exception {
                 Policy required = bean(SpringTransactionPolicy.class, "PROPAGATION_REQUIRED_POLICY");
-                from("activemq:queue:foo?replyTo=queue:foo.reply").policy(required).process(new
ConditionalExceptionProcessor()).to("activemq-1:queue:bar?replyTo=queue:bar.reply");
+                from("activemq:queue:foo?replyTo=queue:foo.reply").policy(required).process(cp).to("activemq-1:queue:bar?replyTo=queue:bar.reply");
                 from("activemq-1:queue:bar").process(new Processor() {
                     public void process(Exchange e) {
                         String request = e.getIn().getBody(String.class);
@@ -66,8 +65,11 @@
             }
         });
 
-        Object reply = template.requestBody("activemq:queue:foo", "blah");
-        assertTrue("Received unexpeced reply", reply.equals("Re: blah"));
+        for (int i = 0; i < 10; ++i) {
+            Object reply = template.requestBody("activemq:queue:foo", "blah" + i);
+            assertTrue("Received unexpeced reply", reply.equals("Re: blah" + i));
+            assertTrue(cp.getErrorMessage(), cp.getErrorMessage() == null);
+        }
     }
 /*
  * This is a working test but is commented out because there is bug in that ConditionalExceptionProcessor


Modified: activemq/camel/trunk/components/camel-spring/pom.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/pom.xml?rev=673008&r1=673007&r2=673008&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring/pom.xml (original)
+++ activemq/camel/trunk/components/camel-spring/pom.xml Mon Jun 30 23:32:47 2008
@@ -104,6 +104,17 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-jdbc</artifactId>
+      <version>${spring-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>hsqldb</groupId>
+      <artifactId>hsqldb</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>commons-logging</groupId>
       <artifactId>commons-logging</artifactId>
       <scope>test</scope>

Modified: activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java?rev=673008&r1=673007&r2=673008&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java
(original)
+++ activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java
Mon Jun 30 23:32:47 2008
@@ -17,46 +17,100 @@
 package org.apache.camel.spring.spi;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.processor.DelegateProcessor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.springframework.transaction.TransactionDefinition;
 import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.DefaultTransactionStatus;
 import org.springframework.transaction.support.TransactionCallbackWithoutResult;
+import org.springframework.transaction.support.TransactionSynchronizationManager;
 import org.springframework.transaction.support.TransactionTemplate;
 
 /**
+ * The <a href="http://activemq.apache.org/camel/transactional-client.html">Transactional
Client</a>
+ * EIP pattern.
+ *
  * @version $Revision$
  */
 public class TransactionInterceptor extends DelegateProcessor {
     private static final transient Log LOG = LogFactory.getLog(TransactionInterceptor.class);
     private final TransactionTemplate transactionTemplate;
 
+    public static final ExchangeProperty<Boolean> TRANSACTED =
+        new ExchangeProperty<Boolean>("transacted", "org.apache.camel.transacted",
Boolean.class);
+
     public TransactionInterceptor(TransactionTemplate transactionTemplate) {
         this.transactionTemplate = transactionTemplate;
     }
 
     public void process(final Exchange exchange) {
-        LOG.info("transaction begin");
+        LOG.debug("Transaction begin");
 
         transactionTemplate.execute(new TransactionCallbackWithoutResult() {
             protected void doInTransactionWithoutResult(TransactionStatus status) {
+                // wrapper exception to throw if the exchange failed
+                // IMPORTANT: Must be a runtime exception to let Spring regard it as to do
"rollback"
+                RuntimeCamelException rce = null;
+
+                boolean activeTx = false;
                 try {
+                    // find out if there is an actual transaction alive, and thus we are
in transacted mode
+                    activeTx = TransactionSynchronizationManager.isActualTransactionActive();
+                    if (!activeTx) {
+                        activeTx = status.isNewTransaction() && !status.isCompleted();
+                        if (!activeTx) {
+                            if (DefaultTransactionStatus.class.isAssignableFrom(status.getClass()))
{
+                                DefaultTransactionStatus defStatus = DefaultTransactionStatus.class
+                                    .cast(status);
+                                activeTx = defStatus.hasTransaction() && !status.isCompleted();
+                            }
+                        }
+                    }
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Is actual transaction active: " + activeTx);
+                    }
+
+                    // okay mark the exchange as transacted, then the DeadLetterChannel or
others know
+                    // its an transacted exchange
+                    if (activeTx) {
+                        TRANSACTED.set(exchange, Boolean.TRUE);
+                    }
+
+
+                    // process the exchange
                     processNext(exchange);
+
+                    // wrap if the exchange failed with an exception
+                    if (exchange.getException() != null) {
+                        rce = new RuntimeCamelException(exchange.getException());
+                    }
                 } catch (Exception e) {
-                    throw new RuntimeCamelException(e);
+                     // wrap if the exchange threw an exception
+                    rce = new RuntimeCamelException(e);
+                }
+                
+                // rehrow exception if the exchange failed
+                if (rce != null) {
+                    if (activeTx) {
+                        status.setRollbackOnly();
+                        LOG.debug("Transaction rollback");
+                    }
+                    throw rce;
                 }
             }
         });
-
-        LOG.info("transaction commit");
+        
+        LOG.debug("Transaction commit");
     }
 
     @Override
     public String toString() {
-        return "TransactionInterceptor:" + propagationBehaviorToString(transactionTemplate.getPropagationBehavior())
+ "[" + getProcessor() + "]";
+        return "TransactionInterceptor:"
+            + propagationBehaviorToString(transactionTemplate.getPropagationBehavior())
+            + "[" + getProcessor() + "]";
     }
 
     private String propagationBehaviorToString(int propagationBehavior) {
@@ -84,8 +138,9 @@
             rc = "PROPAGATION_SUPPORTS";
             break;
         default:
-            rc = "UNKOWN"; 
+            rc = "UNKOWN";
         }
         return rc;
     }
+
 }

Added: activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/BookService.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/BookService.java?rev=673008&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/BookService.java
(added)
+++ activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/BookService.java
Mon Jun 30 23:32:47 2008
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.interceptor;
+
+import javax.sql.DataSource;
+
+import org.springframework.jdbc.core.simple.SimpleJdbcTemplate;
+
+/**
+ * Used for unit testing
+ */
+// START SNIPPET: e1
+public class BookService {
+
+    private SimpleJdbcTemplate jdbc;
+
+    public BookService() {
+    }
+
+    public void setDataSource(DataSource ds) {
+        jdbc = new SimpleJdbcTemplate(ds);
+    }
+
+    public void orderBook(String title) throws Exception {
+        if (title.startsWith("Donkey")) {
+            throw new IllegalArgumentException("We don't have Donkeys, only Camels");
+        }
+
+        // create new local datasource to store in DB
+        jdbc.update("insert into books (title) values (?)", title);
+    }
+}
+// END SNIPPET: e1

Added: activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTest.java?rev=673008&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTest.java
(added)
+++ activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTest.java
Mon Jun 30 23:32:47 2008
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.interceptor;
+
+import javax.sql.DataSource;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spring.SpringTestSupport;
+import org.apache.camel.spring.spi.SpringTransactionPolicy;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.transaction.support.TransactionTemplate;
+
+/**
+ * Unit test to demonstrate the transactional client pattern.
+ */
+public class TransactionalClientDataSourceTest extends SpringTestSupport {
+
+    private JdbcTemplate jdbc;
+
+    protected ClassPathXmlApplicationContext createApplicationContext() {
+        return new ClassPathXmlApplicationContext(
+            "/org/apache/camel/spring/interceptor/transactionalClientDataSource.xml");
+    }
+
+    protected int getExpectedRouteCount() {
+        return 0;
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        this.disableJMX();
+        super.setUp();
+
+        // START SNIPPET: e5
+        // create database and insert dummy data
+        final DataSource ds = getMandatoryBean(DataSource.class, "dataSource");
+        jdbc = new JdbcTemplate(ds);
+        jdbc.execute("create table books (title varchar(50))");
+        jdbc.update("insert into books (title) values (?)", new Object[] {"Camel in Action"});
+        // END SNIPPET: e5
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        jdbc.execute("drop table books");
+        this.enableJMX();
+    }
+
+    // START SNIPPET: e3
+    public void testTransactionSuccess() throws Exception {
+        template.sendBody("direct:okay", "Hello World");
+
+        int count = jdbc.queryForInt("select count(*) from books");
+        assertEquals("Number of books", 3, count);
+    }
+    // END SNIPPET: e3
+
+    // START SNIPPET: e4
+    public void testTransactionRollback() throws Exception {
+        template.sendBody("direct:fail", "Hello World");
+
+        int count = jdbc.queryForInt("select count(*) from books");
+        assertEquals("Number of books", 1, count);
+    }
+    // END SNIPPET: e4
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                // setup the transaction policy
+                TransactionTemplate tt = context.getRegistry()
+                    .lookup("PROPAGATION_REQUIRED", TransactionTemplate.class);
+                SpringTransactionPolicy required = new SpringTransactionPolicy(tt);
+
+                // set the required policy for this route
+                from("direct:okay").policy(required).
+                    setBody(constant("Tiger in Action")).beanRef("bookService").
+                    setBody(constant("Elephant in Action")).beanRef("bookService");
+                // END SNIPPET: e1
+
+                // START SNIPPET: e2
+                // set the required policy for this route
+                from("direct:fail").policy(required).
+                    setBody(constant("Tiger in Action")).beanRef("bookService").
+                    setBody(constant("Donkey in Action")).beanRef("bookService");
+                // END SNIPPET: e2
+            }
+        };
+    }
+
+}

Added: activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/transactionalClientDataSource.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/transactionalClientDataSource.xml?rev=673008&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/transactionalClientDataSource.xml
(added)
+++ activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/transactionalClientDataSource.xml
Mon Jun 30 23:32:47 2008
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
+         http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
+
+    <!-- datasource to the database -->
+    <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
+        <property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
+        <property name="url" value="jdbc:hsqldb:mem:camel"/>
+        <property name="username" value="sa"/>
+        <property name="password" value=""/>
+    </bean>
+
+    <!-- spring transaction manager -->
+    <bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
+        <property name="dataSource" ref="dataSource"/>
+    </bean>
+
+    <!-- policy for required transaction used in our Camel routes -->
+    <bean id="PROPAGATION_REQUIRED" class="org.springframework.transaction.support.TransactionTemplate">
+        <property name="transactionManager" ref="txManager"/>
+    </bean>
+
+    <!-- bean for book business logic -->
+    <bean id="bookService" class="org.apache.camel.spring.interceptor.BookService">
+        <property name="dataSource" ref="dataSource"/>
+    </bean>
+
+</beans>



Mime
View raw message