servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gno...@apache.org
Subject svn commit: r411741 [2/2] - in /incubator/servicemix/trunk/servicemix-eip: ./ src/main/java/org/apache/servicemix/eip/ src/main/java/org/apache/servicemix/eip/patterns/ src/main/java/org/apache/servicemix/eip/support/ src/test/java/org/apache/servicemi...
Date Mon, 05 Jun 2006 10:57:20 GMT
Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java (original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java Mon Jun  5 03:57:19 2006
@@ -23,13 +23,10 @@
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.jbi.messaging.RobustInOnly;
 import javax.xml.transform.Source;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.eip.EIPEndpoint;
 
 /**
@@ -47,8 +44,6 @@
     public static final String SPLITTER_INDEX = "org.apache.servicemix.eip.splitter.index";
     public static final String SPLITTER_CORRID = "org.apache.servicemix.eip.splitter.corrid";
 
-    private static final Log log = LogFactory.getLog(AbstractSplitter.class);
-
     /**
      * The address of the target endpoint
      */
@@ -75,8 +70,26 @@
      * The correlation property used by this component
      */
     private String correlation;
+    /**
+     * Specifies wether exchanges for all parts are sent synchronously or not.
+     */
+    private boolean synchronous;
     
     /**
+     * @return the synchronous
+     */
+    public boolean isSynchronous() {
+        return synchronous;
+    }
+
+    /**
+     * @param synchronous the synchronous to set
+     */
+    public void setSynchronous(boolean synchronous) {
+        this.synchronous = synchronous;
+    }
+
+    /**
      * @return Returns the reportErrors.
      */
     public boolean isReportErrors() {
@@ -146,44 +159,75 @@
     }
     
     /* (non-Javadoc)
-     * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+     * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
      */
-    public void process(MessageExchange exchange) throws MessagingException {
-        try {
-            // If we need to report errors, the behavior is really different,
-            // as we need to keep the incoming exchange in the store until
-            // all acks have been received
-            if (reportErrors) {
-                // TODO: implement this
-                throw new UnsupportedOperationException("Not implemented");
-            // We are in a simple fire-and-forget behaviour.
-            // This implementation is really efficient as we do not use
-            // the store at all.
-            } else {
-                if (exchange.getStatus() == ExchangeStatus.DONE) {
-                    return;
-                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
-                    return;
-                } else if (exchange instanceof InOnly == false &&
-                           exchange instanceof RobustInOnly == false) {
-                    fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
-                } else if (exchange.getFault() != null) {
-                    done(exchange);
-                } else {
-                    MessageExchange[] parts = createParts(exchange);
-                    for (int i = 0; i < parts.length; i++) {
-                        target.configureTarget(parts[i], getContext());
-                        send(parts[i]);
+    protected void processSync(MessageExchange exchange) throws Exception {
+        if (exchange instanceof InOnly == false && 
+            exchange instanceof RobustInOnly == false) {
+            fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+            return;
+        }
+        MessageExchange[] parts = createParts(exchange);
+        for (int i = 0; i < parts.length; i++) {
+            target.configureTarget(parts[i], getContext());
+            if (reportErrors || isSynchronous()) {
+                sendSync(parts[i]);
+                if (parts[i].getStatus() == ExchangeStatus.DONE) {
+                    // nothing to do
+                } else if (parts[i].getStatus() == ExchangeStatus.ERROR) {
+                    if (reportErrors) {
+                        fail(exchange, parts[i].getError());
+                        return;
+                    }
+                } else if (parts[i].getFault() != null) {
+                    if (reportErrors) {
+                        MessageUtil.transferToFault(MessageUtil.copyFault(parts[i]), exchange);
+                        done(parts[i]);
+                        sendSync(exchange);
+                        return;
+                    } else {
+                        done(parts[i]);
                     }
-                    done(exchange);
+                } else {
+                    throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Fault message");
                 }
+            } else {
+                send(parts[i]);
             }
-        // If an error occurs, log it and report the error back to the sender
-        // if the exchange is still ACTIVE 
-        } catch (Exception e) {
-            log.error("An exception occured while processing exchange", e);
-            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-                fail(exchange, e);
+        }
+        done(exchange);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
+     */
+    protected void processAsync(MessageExchange exchange) throws Exception {
+        // If we need to report errors, the behavior is really different,
+        // as we need to keep the incoming exchange in the store until
+        // all acks have been received
+        if (reportErrors) {
+            // TODO: implement this
+            throw new UnsupportedOperationException("Not implemented");
+        // We are in a simple fire-and-forget behaviour.
+        // This implementation is really efficient as we do not use
+        // the store at all.
+        } else {
+            if (exchange.getStatus() == ExchangeStatus.DONE) {
+                return;
+            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                return;
+            } else if (exchange instanceof InOnly == false &&
+                       exchange instanceof RobustInOnly == false) {
+                fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+            } else if (exchange.getFault() != null) {
+                done(exchange);
+            } else {
+                MessageExchange[] parts = createParts(exchange);
+                for (int i = 0; i < parts.length; i++) {
+                    target.configureTarget(parts[i], getContext());
+                    send(parts[i]);
+                }
+                done(exchange);
             }
         }
     }

Modified: incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTest.java?rev=411741&r1=411740&r2=411741&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTest.java (original)
+++ incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTest.java Mon Jun  5 03:57:19 2006
@@ -31,6 +31,7 @@
 
 import junit.framework.TestCase;
 
+import org.apache.servicemix.JbiConstants;
 import org.apache.servicemix.MessageExchangeListener;
 import org.apache.servicemix.client.DefaultServiceMixClient;
 import org.apache.servicemix.components.util.ComponentSupport;
@@ -116,14 +117,25 @@
     protected static class ReturnOutComponent extends ComponentSupport implements MessageExchangeListener {
         public void onMessageExchange(MessageExchange exchange) throws MessagingException {
             if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
                 if (exchange.getMessage("out") == null) {
                     NormalizedMessage out = exchange.createMessage();
                     out.setContent(createSource("<outMsg/>"));
-                    answer(exchange, out);
+                    exchange.setMessage(out, "out");
+                    if (txSync) {
+                        sendSync(exchange);
+                    } else {
+                        send(exchange);
+                    }
                 } else if (exchange.getFault() == null) {
                     Fault fault = exchange.createFault();
                     fault.setContent(createSource("<fault/>"));
-                    fail(exchange, fault);
+                    exchange.setMessage(fault, "fault");
+                    if (txSync) {
+                        sendSync(exchange);
+                    } else {
+                        send(exchange);
+                    }
                 } else {
                     done(exchange);
                 }
@@ -138,9 +150,15 @@
         }
         public void onMessageExchange(MessageExchange exchange) throws MessagingException {
             if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
                 NormalizedMessage out = exchange.createMessage();
                 out.setContent(createSource(response));
-                answer(exchange, out);
+                exchange.setMessage(out, "out");
+                if (txSync) {
+                    sendSync(exchange);
+                } else {
+                    send(exchange);
+                }
             }
         }
     }
@@ -149,9 +167,15 @@
         public void onMessageExchange(MessageExchange exchange) throws MessagingException {
             if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
                 if (exchange.getMessage("out") == null) {
+                    boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
                     NormalizedMessage out = exchange.createMessage();
                     out.setContent(createSource("<outMsg/>"));
-                    answer(exchange, out);
+                    exchange.setMessage(out, "out");
+                    if (txSync) {
+                        sendSync(exchange);
+                    } else {
+                        send(exchange);
+                    }
                 } else {
                     fail(exchange, new Exception("Dummy error"));
                 }

Added: incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTransactionalTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTransactionalTest.java?rev=411741&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTransactionalTest.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/AbstractEIPTransactionalTest.java Mon Jun  5 03:57:19 2006
@@ -0,0 +1,111 @@
+package org.apache.servicemix.eip;
+
+import javax.resource.spi.ConnectionManager;
+import javax.resource.spi.ManagedConnectionFactory;
+import javax.sql.DataSource;
+import javax.sql.XADataSource;
+import javax.transaction.TransactionManager;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.derby.jdbc.EmbeddedXADataSource;
+import org.apache.geronimo.connector.outbound.GenericConnectionManager;
+import org.apache.geronimo.connector.outbound.connectionmanagerconfig.NoPool;
+import org.apache.geronimo.connector.outbound.connectionmanagerconfig.XATransactions;
+import org.apache.geronimo.transaction.context.GeronimoTransactionManager;
+import org.apache.geronimo.transaction.context.TransactionContextManager;
+import org.apache.geronimo.transaction.manager.TransactionManagerImpl;
+import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.nmr.flow.Flow;
+import org.apache.servicemix.jbi.nmr.flow.jca.JCAFlow;
+import org.apache.servicemix.jbi.nmr.flow.seda.SedaFlow;
+import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.jdbc.JdbcStoreFactory;
+import org.apache.servicemix.tck.ExchangeCompletedListener;
+import org.tranql.connector.AllExceptionsAreFatalSorter;
+import org.tranql.connector.jdbc.AbstractXADataSourceMCF;
+
+public abstract class AbstractEIPTransactionalTest extends AbstractEIPTest {
+
+    protected BrokerService broker;
+    protected TransactionManager tm;
+    protected DataSource dataSource;
+    protected Store store;
+    
+    protected void setUp() throws Exception {
+        // Create an AMQ broker
+        broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.setPersistent(false);
+        broker.addConnector("tcp://localhost:61616");
+        broker.start();
+        
+        TransactionManagerImpl exTransactionManager = new TransactionManagerImpl(600, null, null);
+        TransactionContextManager transactionContextManager = new TransactionContextManager(exTransactionManager, exTransactionManager);
+        tm = (TransactionManager) new GeronimoTransactionManager(transactionContextManager);
+        
+        // Create an embedded database for testing tx results when commit / rollback
+        ConnectionManager cm = new GenericConnectionManager(
+                        new XATransactions(true, true),
+                        new NoPool(),
+                        false,
+                        null,
+                        transactionContextManager,
+                        "connectionManager",
+                        GenericConnectionManager.class.getClassLoader());
+        ManagedConnectionFactory mcf = new DerbyDataSourceMCF("target/testdb");
+        dataSource = (DataSource) mcf.createConnectionFactory(cm);
+
+        JdbcStoreFactory storeFactory = new JdbcStoreFactory();
+        storeFactory.setDataSource(dataSource);
+        storeFactory.setTransactional(true);
+        store = storeFactory.open("store");
+        
+        JCAFlow jcaFlow = new JCAFlow();
+        jcaFlow.setTransactionContextManager(transactionContextManager);
+        
+        jbi = new JBIContainer();
+        jbi.setFlows(new Flow[] { new SedaFlow(), jcaFlow });
+        jbi.setEmbedded(true);
+        jbi.setUseMBeanServer(false);
+        jbi.setCreateMBeanServer(false);
+        jbi.setTransactionManager(tm);
+        jbi.setAutoEnlistInTransaction(true);
+        listener = new ExchangeCompletedListener();
+        jbi.addListener(listener);
+        jbi.init();
+        jbi.start();
+
+        client = new DefaultServiceMixClient(jbi);
+    }
+    
+    protected void tearDown() throws Exception {
+        listener.assertExchangeCompleted();
+        jbi.shutDown();
+        broker.stop();
+    }
+
+    protected void configurePattern(EIPEndpoint endpoint) {
+        endpoint.setStore(store);
+    }
+    
+    public static class DerbyDataSourceMCF extends AbstractXADataSourceMCF {
+        private static final long serialVersionUID = 7971682207810098396L;
+        protected DerbyDataSourceMCF(String dbName) {
+            super(createXADS(dbName), new AllExceptionsAreFatalSorter());
+        }
+        public String getPassword() {
+            return null;
+        }
+        public String getUserName() {
+            return null;
+        }
+        protected static XADataSource createXADS(String dbName) {
+            EmbeddedXADataSource xads = new EmbeddedXADataSource();
+            xads.setDatabaseName(dbName);
+            xads.setCreateDatabase("create");
+            return xads;
+        }
+    }
+    
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/ContentBasedRouterTxTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/ContentBasedRouterTxTest.java?rev=411741&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/ContentBasedRouterTxTest.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/ContentBasedRouterTxTest.java Mon Jun  5 03:57:19 2006
@@ -0,0 +1,225 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.InOut;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.eip.patterns.ContentBasedRouter;
+import org.apache.servicemix.eip.support.RoutingRule;
+import org.apache.servicemix.eip.support.XPathPredicate;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.tck.ReceiverComponent;
+import org.w3c.dom.Node;
+
+public class ContentBasedRouterTxTest extends AbstractEIPTransactionalTest {
+
+    private ContentBasedRouter router;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        router = new ContentBasedRouter();
+        router.setRules(new RoutingRule[] {
+                new RoutingRule(
+                        new XPathPredicate("/hello/@id = '1'"),
+                        createServiceExchangeTarget(new QName("target1"))),
+                new RoutingRule(
+                        new XPathPredicate("/hello/@id = '2'"),
+                        createServiceExchangeTarget(new QName("target2"))),
+                new RoutingRule(
+                        null,
+                        createServiceExchangeTarget(new QName("target3")))
+        });
+        configurePattern(router);
+        activateComponent(router, "router");
+    }
+    
+    public void testInOnlySync() throws Exception {
+        ReceiverComponent rec1 = activateReceiver("target1");
+        ReceiverComponent rec2 = activateReceiver("target2");
+        ReceiverComponent rec3 = activateReceiver("target3");
+        
+        tm.begin();
+        
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("router"));
+        me.getInMessage().setContent(createSource("<hello id='1' />"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        
+        rec1.getMessageList().assertMessagesReceived(1); 
+        rec2.getMessageList().assertMessagesReceived(0);
+        rec3.getMessageList().assertMessagesReceived(0);
+
+        rec1.getMessageList().flushMessages();
+        rec2.getMessageList().flushMessages();
+        rec3.getMessageList().flushMessages();
+        
+        me = client.createInOnlyExchange();
+        me.setService(new QName("router"));
+        me.getInMessage().setContent(createSource("<hello id='2' />"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        
+        rec1.getMessageList().assertMessagesReceived(0);
+        rec2.getMessageList().assertMessagesReceived(1);
+        rec3.getMessageList().assertMessagesReceived(0);
+
+        rec1.getMessageList().flushMessages();
+        rec2.getMessageList().flushMessages();
+        rec3.getMessageList().flushMessages();
+
+        me = client.createInOnlyExchange();
+        me.setService(new QName("router"));
+        me.getInMessage().setContent(createSource("<hello id='3' />"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        
+        rec1.getMessageList().assertMessagesReceived(0);
+        rec2.getMessageList().assertMessagesReceived(0);
+        rec3.getMessageList().assertMessagesReceived(1);
+
+        tm.commit();
+    }
+
+    public void testInOnlyAsync() throws Exception {
+        ReceiverComponent rec1 = activateReceiver("target1");
+        ReceiverComponent rec2 = activateReceiver("target2");
+        ReceiverComponent rec3 = activateReceiver("target3");
+        
+        tm.begin();
+        
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("router"));
+        me.getInMessage().setContent(createSource("<hello id='1' />"));
+        client.send(me);
+        
+        me = client.createInOnlyExchange();
+        me.setService(new QName("router"));
+        me.getInMessage().setContent(createSource("<hello id='2' />"));
+        client.send(me);
+        
+        me = client.createInOnlyExchange();
+        me.setService(new QName("router"));
+        me.getInMessage().setContent(createSource("<hello id='3' />"));
+        client.send(me);
+        
+        tm.commit();
+        
+        me = (InOnly) client.receive();
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        me = (InOnly) client.receive();
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        me = (InOnly) client.receive();
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        
+        rec1.getMessageList().assertMessagesReceived(1); 
+        rec2.getMessageList().assertMessagesReceived(1);
+        rec3.getMessageList().assertMessagesReceived(1);
+    }
+
+    public void testInOutSync() throws Exception {
+        tm.begin();
+        
+        activateComponent(new ReturnMockComponent("<from1/>"), "target1");
+        activateComponent(new ReturnMockComponent("<from2/>"), "target2");
+        activateComponent(new ReturnMockComponent("<from3/>"), "target3");
+        
+        InOut me = client.createInOutExchange();
+        me.setService(new QName("router"));
+        me.getInMessage().setContent(createSource("<hello id='1' />"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        Node node = new SourceTransformer().toDOMNode(me.getOutMessage());
+        assertEquals("from1", node.getFirstChild().getNodeName());
+        client.done(me);
+        
+        me = client.createInOutExchange();
+        me.setService(new QName("router"));
+        me.getInMessage().setContent(createSource("<hello id='2' />"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        node = new SourceTransformer().toDOMNode(me.getOutMessage());
+        assertEquals("from2", node.getFirstChild().getNodeName());
+        client.done(me);
+        
+        me = client.createInOutExchange();
+        me.setService(new QName("router"));
+        me.getInMessage().setContent(createSource("<hello id='3' />"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        node = new SourceTransformer().toDOMNode(me.getOutMessage());
+        assertEquals("from3", node.getFirstChild().getNodeName());
+        client.done(me);
+        
+        tm.commit();
+    }
+
+    public void testInOutAsync() throws Exception {
+        activateComponent(new ReturnMockComponent("<from1/>"), "target1");
+        activateComponent(new ReturnMockComponent("<from2/>"), "target2");
+        activateComponent(new ReturnMockComponent("<from3/>"), "target3");
+        
+        tm.begin();
+        
+        InOut me = client.createInOutExchange();
+        me.setService(new QName("router"));
+        me.getInMessage().setContent(createSource("<hello id='1' />"));
+        client.send(me);
+        
+        tm.commit();
+        
+        me = (InOut) client.receive();
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        Node node = new SourceTransformer().toDOMNode(me.getOutMessage());
+        assertEquals("from1", node.getFirstChild().getNodeName());
+        client.done(me);
+        
+        tm.begin();
+        
+        me = client.createInOutExchange();
+        me.setService(new QName("router"));
+        me.getInMessage().setContent(createSource("<hello id='2' />"));
+        client.send(me);
+        
+        tm.commit();
+        
+        me = (InOut) client.receive();
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        node = new SourceTransformer().toDOMNode(me.getOutMessage());
+        assertEquals("from2", node.getFirstChild().getNodeName());
+        client.done(me);
+        
+        tm.begin();
+        
+        me = client.createInOutExchange();
+        me.setService(new QName("router"));
+        me.getInMessage().setContent(createSource("<hello id='3' />"));
+        client.send(me);
+        
+        tm.commit();
+        
+        me = (InOut) client.receive();
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        node = new SourceTransformer().toDOMNode(me.getOutMessage());
+        assertEquals("from3", node.getFirstChild().getNodeName());
+        client.done(me);
+    }
+
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/MessageFilterTxTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/MessageFilterTxTest.java?rev=411741&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/MessageFilterTxTest.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/MessageFilterTxTest.java Mon Jun  5 03:57:19 2006
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.InOut;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.eip.patterns.MessageFilter;
+import org.apache.servicemix.eip.support.XPathPredicate;
+import org.apache.servicemix.tck.ReceiverComponent;
+
+public class MessageFilterTxTest extends AbstractEIPTransactionalTest {
+
+    protected MessageFilter messageFilter;
+    
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        messageFilter = new MessageFilter();
+        messageFilter.setFilter(new XPathPredicate("/hello/@id = '1'"));
+        messageFilter.setTarget(createServiceExchangeTarget(new QName("target")));
+        configurePattern(messageFilter);
+        activateComponent(messageFilter, "messageFilter");
+    }
+    
+    public void testInOnlySync() throws Exception {
+        ReceiverComponent rec = activateReceiver("target");
+        
+        tm.begin();
+        
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("messageFilter"));
+        me.getInMessage().setContent(createSource("<hello><one/><two/><three/></hello>"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        
+        tm.commit();
+        
+        rec.getMessageList().assertMessagesReceived(0); 
+
+        tm.begin();
+        
+        me = client.createInOnlyExchange();
+        me.setService(new QName("messageFilter"));
+        me.getInMessage().setContent(createSource("<hello id='1'><one/><two/><three/></hello>"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        
+        tm.commit();
+        
+        rec.getMessageList().assertMessagesReceived(1); 
+    }
+
+    public void testInOnlyAsync() throws Exception {
+        ReceiverComponent rec = activateReceiver("target");
+        
+        tm.begin();
+        
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("messageFilter"));
+        me.getInMessage().setContent(createSource("<hello><one/><two/><three/></hello>"));
+        client.send(me);
+
+        me = client.createInOnlyExchange();
+        me.setService(new QName("messageFilter"));
+        me.getInMessage().setContent(createSource("<hello id='1'><one/><two/><three/></hello>"));
+        client.send(me);
+
+        tm.commit();
+
+        me = (InOnly) client.receive();
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        me = (InOnly) client.receive();
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+
+        rec.getMessageList().assertMessagesReceived(1); 
+    }
+
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/PipelineTxTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/PipelineTxTest.java?rev=411741&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/PipelineTxTest.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/PipelineTxTest.java Mon Jun  5 03:57:19 2006
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.eip.patterns.Pipeline;
+import org.apache.servicemix.tck.ReceiverComponent;
+
+public class PipelineTxTest extends AbstractEIPTransactionalTest {
+
+    protected Pipeline pipeline;
+    
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        pipeline = new Pipeline();
+        pipeline.setTransformer(createServiceExchangeTarget(new QName("transformer")));
+        pipeline.setTarget(createServiceExchangeTarget(new QName("target")));
+        configurePattern(pipeline);
+        activateComponent(pipeline, "pipeline");
+    }
+    
+    public void testInOnlySync() throws Exception {
+        activateComponent(new ReturnOutComponent(), "transformer");
+        ReceiverComponent target = activateReceiver("target");
+
+        tm.begin();
+        
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("pipeline"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        
+        tm.commit();
+        
+        target.getMessageList().assertMessagesReceived(1);
+        
+        listener.assertExchangeCompleted();
+    }
+    
+    public void testInOnlyAsync() throws Exception {
+        activateComponent(new ReturnOutComponent(), "transformer");
+        ReceiverComponent target = activateReceiver("target");
+
+        tm.begin();
+        
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("pipeline"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.send(me);
+        
+        tm.commit();
+        
+        me = (InOnly) client.receive();
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        
+        target.getMessageList().assertMessagesReceived(1);
+        
+        listener.assertExchangeCompleted();
+    }
+    
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTxTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTxTest.java?rev=411741&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTxTest.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTxTest.java Mon Jun  5 03:57:19 2006
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.eip.patterns.SplitAggregator;
+import org.apache.servicemix.eip.support.AbstractSplitter;
+import org.apache.servicemix.tck.ReceiverComponent;
+
+public class SplitAggregatorTxTest extends AbstractEIPTransactionalTest {
+
+    private SplitAggregator aggregator;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        aggregator = new SplitAggregator();
+        aggregator.setTarget(createServiceExchangeTarget(new QName("target")));
+        configurePattern(aggregator);
+        activateComponent(aggregator, "aggregator");
+    }
+    
+    protected NormalizedMessage testRun(boolean[] msgs, boolean async) throws Exception {
+        ReceiverComponent rec = activateReceiver("target");
+        
+        int nbMessages = 3;
+        String corrId = Long.toString(System.currentTimeMillis());
+        for (int i = 0; i < 3; i++) {
+            if (msgs == null || msgs[i]) {
+                InOnly me = client.createInOnlyExchange();
+                me.setService(new QName("aggregator"));
+                me.getInMessage().setContent(createSource("<hello id='" + i + "' />"));
+                me.getInMessage().setProperty(AbstractSplitter.SPLITTER_COUNT, new Integer(nbMessages));
+                me.getInMessage().setProperty(AbstractSplitter.SPLITTER_INDEX, new Integer(i));
+                me.getInMessage().setProperty(AbstractSplitter.SPLITTER_CORRID, corrId);
+                tm.begin();
+                if (async) {
+                    client.send(me);
+                } else {
+                    client.sendSync(me);
+                }
+                tm.commit();
+            }
+        }        
+        
+        rec.getMessageList().assertMessagesReceived(1);
+        return (NormalizedMessage) rec.getMessageList().flushMessages().get(0);
+    }
+    
+    public void testAsync() throws Exception {
+        testRun(null, true);
+    }
+    
+    public void testSync() throws Exception {
+        testRun(null, false);
+    }
+    
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/StaticRecipientListTxTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/StaticRecipientListTxTest.java?rev=411741&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/StaticRecipientListTxTest.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/StaticRecipientListTxTest.java Mon Jun  5 03:57:19 2006
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.eip.patterns.StaticRecipientList;
+import org.apache.servicemix.eip.support.ExchangeTarget;
+import org.apache.servicemix.tck.ReceiverComponent;
+
+public class StaticRecipientListTxTest extends AbstractEIPTransactionalTest {
+
+    protected StaticRecipientList recipientList;
+    
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        recipientList = new StaticRecipientList();
+        recipientList.setRecipients(
+                new ExchangeTarget[] {
+                        createServiceExchangeTarget(new QName("recipient1")),
+                        createServiceExchangeTarget(new QName("recipient2")),
+                        createServiceExchangeTarget(new QName("recipient3"))
+                });
+        configurePattern(recipientList);
+        activateComponent(recipientList, "recipientList");
+    }
+
+    public void testInOnlySync() throws Exception {
+        ReceiverComponent r1 = activateReceiver("recipient1");
+        ReceiverComponent r2 = activateReceiver("recipient2");
+        ReceiverComponent r3 = activateReceiver("recipient3");
+        
+        tm.begin();
+        
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("recipientList"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        
+        tm.commit();
+        
+        r1.getMessageList().assertMessagesReceived(1);
+        r2.getMessageList().assertMessagesReceived(1);
+        r3.getMessageList().assertMessagesReceived(1);
+    }
+    
+    public void testInOnlyAsync() throws Exception {
+        ReceiverComponent r1 = activateReceiver("recipient1");
+        ReceiverComponent r2 = activateReceiver("recipient2");
+        ReceiverComponent r3 = activateReceiver("recipient3");
+        
+        tm.begin();
+        
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("recipientList"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.send(me);
+        
+        tm.commit();
+
+        me = (InOnly) client.receive();
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        
+        r1.getMessageList().assertMessagesReceived(1);
+        r2.getMessageList().assertMessagesReceived(1);
+        r3.getMessageList().assertMessagesReceived(1);
+    }
+    
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/StaticRoutingSlipTxTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/StaticRoutingSlipTxTest.java?rev=411741&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/StaticRoutingSlipTxTest.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/StaticRoutingSlipTxTest.java Mon Jun  5 03:57:19 2006
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.InOptionalOut;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.RobustInOnly;
+import javax.transaction.Status;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.eip.patterns.StaticRoutingSlip;
+import org.apache.servicemix.eip.support.ExchangeTarget;
+
+
+public class StaticRoutingSlipTxTest extends AbstractEIPTransactionalTest {
+
+    protected StaticRoutingSlip routingSlip;
+    
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        routingSlip = new StaticRoutingSlip();
+        routingSlip.setTargets(
+                new ExchangeTarget[] {
+                        createServiceExchangeTarget(new QName("target1")),
+                        createServiceExchangeTarget(new QName("target2")),
+                        createServiceExchangeTarget(new QName("target3"))
+                });
+        configurePattern(routingSlip);
+        activateComponent(routingSlip, "routingSlip");
+    }
+
+    public void testSync() throws Exception {
+        activateComponent(new ReturnOutComponent(), "target1");
+        activateComponent(new ReturnOutComponent(), "target2");
+        activateComponent(new ReturnOutComponent(), "target3");
+
+        tm.begin();
+        
+        InOut me = client.createInOutExchange();
+        me.setService(new QName("routingSlip"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        assertNotNull(me.getOutMessage());
+        client.done(me);
+        
+        tm.commit();
+        
+        listener.assertExchangeCompleted();
+    }
+    
+    public void testAsync() throws Exception {
+        activateComponent(new ReturnOutComponent(), "target1");
+        activateComponent(new ReturnOutComponent(), "target2");
+        activateComponent(new ReturnOutComponent(), "target3");
+
+        tm.begin();
+        
+        InOut me = client.createInOutExchange();
+        me.setService(new QName("routingSlip"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.send(me);
+        
+        tm.commit();
+
+        me = (InOut) client.receive();
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        assertEquals(Status.STATUS_ACTIVE, tm.getStatus());
+        assertNotNull(me.getOutMessage());
+        client.done(me);
+        
+        listener.assertExchangeCompleted();
+    }
+    
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/WireTapTxTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/WireTapTxTest.java?rev=411741&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/WireTapTxTest.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/WireTapTxTest.java Mon Jun  5 03:57:19 2006
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.InOut;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.eip.patterns.WireTap;
+import org.apache.servicemix.tck.ReceiverComponent;
+
+public class WireTapTxTest extends AbstractEIPTransactionalTest {
+
+    protected ReceiverComponent inReceiver;
+    protected ReceiverComponent outReceiver;
+    protected ReceiverComponent faultReceiver;
+    protected WireTap wireTap;
+    
+    protected void setUp() throws Exception {
+        super.setUp();
+        
+        inReceiver = activateReceiver("in");
+        outReceiver = activateReceiver("out");
+        faultReceiver = activateReceiver("fault");
+        wireTap = new WireTap();
+        wireTap.setInListener(createServiceExchangeTarget(new QName("in")));
+        wireTap.setOutListener(createServiceExchangeTarget(new QName("out")));
+        wireTap.setFaultListener(createServiceExchangeTarget(new QName("fault")));
+        wireTap.setTarget(createServiceExchangeTarget(new QName("target")));
+        configurePattern(wireTap);
+        activateComponent(wireTap, "wireTap");
+    }
+    
+    public void testInOnlySync() throws Exception {
+        ReceiverComponent target = activateReceiver("target");
+
+        tm.begin();
+        
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("wireTap"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        
+        tm.commit();
+        
+        target.getMessageList().assertMessagesReceived(1);
+        inReceiver.getMessageList().assertMessagesReceived(1);
+        outReceiver.getMessageList().assertMessagesReceived(0);
+        faultReceiver.getMessageList().assertMessagesReceived(0);
+    }
+    
+    public void testInOnlyAsync() throws Exception {
+        ReceiverComponent target = activateReceiver("target");
+
+        tm.begin();
+        
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("wireTap"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.send(me);
+        
+        tm.commit();
+        
+        me = (InOnly) client.receive();
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        
+        target.getMessageList().assertMessagesReceived(1);
+        inReceiver.getMessageList().assertMessagesReceived(1);
+        outReceiver.getMessageList().assertMessagesReceived(0);
+        faultReceiver.getMessageList().assertMessagesReceived(0);
+    }
+    
+    public void testInOutSync() throws Exception {
+        activateComponent(new ReturnOutComponent(), "target");
+
+        tm.begin();
+        
+        InOut me = client.createInOutExchange();
+        me.setService(new QName("wireTap"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        client.done(me);
+        
+        tm.commit();
+    }
+    
+    public void testInOutAsync() throws Exception {
+        activateComponent(new ReturnOutComponent(), "target");
+
+        tm.begin();
+        
+        InOut me = client.createInOutExchange();
+        me.setService(new QName("wireTap"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.send(me);
+        
+        tm.commit();
+        
+        me = (InOut) client.receive();
+        client.done(me);
+    }
+    
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/XPathSplitterTxTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/XPathSplitterTxTest.java?rev=411741&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/XPathSplitterTxTest.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/XPathSplitterTxTest.java Mon Jun  5 03:57:19 2006
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.eip.patterns.XPathSplitter;
+import org.apache.servicemix.tck.ReceiverComponent;
+
+public class XPathSplitterTxTest extends AbstractEIPTransactionalTest {
+
+    private XPathSplitter splitter;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        splitter = new XPathSplitter();
+        splitter.setTarget(createServiceExchangeTarget(new QName("target")));
+        splitter.setXPath("/hello/*");
+        configurePattern(splitter);
+        activateComponent(splitter, "splitter");
+    }
+    
+    public void testSync() throws Exception {
+        ReceiverComponent rec = activateReceiver("target");
+        
+        tm.begin();
+        
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("splitter"));
+        me.getInMessage().setContent(createSource("<hello><one/><two/><three/></hello>"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        
+        tm.commit();
+        
+        rec.getMessageList().assertMessagesReceived(3); 
+    }
+    
+    public void testAsync() throws Exception {
+        ReceiverComponent rec = activateReceiver("target");
+        
+        tm.begin();
+        
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("splitter"));
+        me.getInMessage().setContent(createSource("<hello><one/><two/><three/></hello>"));
+        client.send(me);
+        
+        tm.commit();
+        
+        me = (InOnly) client.receive();
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        
+        rec.getMessageList().assertMessagesReceived(3); 
+    }
+
+}



Mime
View raw message