servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gno...@apache.org
Subject svn commit: r411195 [1/2] - in /incubator/servicemix/trunk/servicemix-core: ./ src/main/java/org/apache/servicemix/jbi/framework/ src/main/java/org/apache/servicemix/jbi/management/ src/main/java/org/apache/servicemix/jbi/messaging/ src/main/java/org/a...
Date Fri, 02 Jun 2006 15:59:40 GMT
Author: gnodet
Date: Fri Jun  2 08:59:39 2006
New Revision: 411195

URL: http://svn.apache.org/viewvc?rev=411195&view=rev
Log:
SM-443: Change transaction semantic using send/sendSync and improve transactions support

Added:
    incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java
Modified:
    incubator/servicemix/trunk/servicemix-core/pom.xml
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/AutoDeploymentService.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/management/MBeanServerContext.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeImpl.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/DefaultBroker.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/FlowChooser.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/FlowProvider.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaQueue.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java
    incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/SedaFlowTransactionTest.java
    incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/StFlowTransactionTest.java
    incubator/servicemix/trunk/servicemix-core/src/test/resources/log4j.properties

Modified: incubator/servicemix/trunk/servicemix-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/pom.xml?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/pom.xml (original)
+++ incubator/servicemix/trunk/servicemix-core/pom.xml Fri Jun  2 08:59:39 2006
@@ -186,10 +186,12 @@
       <groupId>woodstox</groupId>
       <artifactId>wstx-asl</artifactId>
     </dependency>
+    <!--  
     <dependency>
       <groupId>stax-utils</groupId>
       <artifactId>stax-utils</artifactId>
     </dependency>
+    -->
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
@@ -232,6 +234,17 @@
       <groupId>ant</groupId>
       <artifactId>ant</artifactId>
       <optional>true</optional>
+    </dependency>
+    
+    <dependency>
+      <groupId>tranql</groupId>
+      <artifactId>tranql-connector</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.derby</groupId>
+      <artifactId>derby</artifactId>
+      <scope>test</scope>
     </dependency>
   </dependencies>
 

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/AutoDeploymentService.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/AutoDeploymentService.java?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/AutoDeploymentService.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/AutoDeploymentService.java Fri Jun  2 08:59:39 2006
@@ -456,7 +456,7 @@
     }
 
     private void scheduleDirectoryTimer(){
-        if(!container.isEmbedded()&&(isMonitorInstallationDirectory()||isMonitorDeploymentDirectory())){
+        if (!container.isEmbedded() && (isMonitorInstallationDirectory() || isMonitorDeploymentDirectory())) {
             if(statsTimer==null){
                 statsTimer=new Timer(true);
             }

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/management/MBeanServerContext.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/management/MBeanServerContext.java?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/management/MBeanServerContext.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/management/MBeanServerContext.java Fri Jun  2 08:59:39 2006
@@ -15,12 +15,405 @@
  */
 package org.apache.servicemix.jbi.management;
 
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.MalformedURLException;
+import java.rmi.registry.LocateRegistry;
+import java.util.List;
+
+import javax.management.Attribute;
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
 
 /**
  *Wrapper around ActiveMQ ManagementContext. Re-use to build/find mbean server
  * 
  * @version $Revision: 372580 $
  */
-class MBeanServerContext extends org.apache.activemq.broker.jmx.ManagementContext  {
-    
+class MBeanServerContext {
+
+    /**
+     * Default activemq domain
+     */
+    public static final String DEFAULT_DOMAIN="org.apache.activemq";
+    private final static Log log=LogFactory.getLog(ManagementContext.class);
+    private MBeanServer beanServer;
+    private String jmxDomainName=DEFAULT_DOMAIN;
+    private boolean useMBeanServer=true;
+    private boolean createMBeanServer=true;
+    private boolean locallyCreateMBeanServer=false;
+    private boolean createConnector=true;
+    private boolean findTigerMbeanServer=false;
+    private int connectorPort=1099;
+    private String connectorPath="/jmxrmi";
+    private AtomicBoolean started=new AtomicBoolean(false);
+    private JMXConnectorServer connectorServer;
+    private ObjectName namingServiceObjectName;
+
+    public MBeanServerContext(){
+        this(null);
+    }
+
+    public MBeanServerContext(MBeanServer server){
+        this.beanServer=server;
+    }
+
+    public void start() throws IOException {
+        // lets force the MBeanServer to be created if needed
+        if (started.compareAndSet(false, true)) {
+            getMBeanServer();
+            if (connectorServer != null) {
+                try {
+                    getMBeanServer().invoke(namingServiceObjectName, "start", null, null);
+                }
+                catch (Throwable ignore) {
+                }
+                Thread t = new Thread("JMX connector") {
+                    public void run() {
+                        try {
+                            JMXConnectorServer server = connectorServer;
+                            if (started.get() && server != null) {
+                                server.start();
+                                log.info("JMX consoles can connect to " + server.getAddress());
+                            }
+                        }
+                        catch (IOException e) {
+                            log.warn("Failed to start jmx connector: " + e.getMessage());
+                        }
+                    }
+                };
+                t.setDaemon(true);
+                t.start();
+            }
+        }
+    }
+
+    public void stop() throws IOException {
+        if (started.compareAndSet(true, false)) {
+            JMXConnectorServer server = connectorServer;
+            connectorServer = null;
+            if (server != null) {
+                try {
+                    server.stop();
+                }
+                catch (IOException e) {
+                    log.warn("Failed to stop jmx connector: " + e.getMessage());
+                }
+                try {
+                    getMBeanServer().invoke(namingServiceObjectName, "stop", null, null);
+                }
+                catch (Throwable ignore) {
+                }
+            }
+            if (locallyCreateMBeanServer && beanServer != null) {
+                // check to see if the factory knows about this server
+                List list = MBeanServerFactory.findMBeanServer(null);
+                if (list != null && !list.isEmpty() && list.contains(beanServer)) {
+                    MBeanServerFactory.releaseMBeanServer(beanServer);
+                }
+            }
+        }
+    }
+
+    /**
+     * @return Returns the jmxDomainName.
+     */
+    public String getJmxDomainName(){
+        return jmxDomainName;
+    }
+
+    /**
+     * @param jmxDomainName
+     *            The jmxDomainName to set.
+     */
+    public void setJmxDomainName(String jmxDomainName){
+        this.jmxDomainName=jmxDomainName;
+    }
+
+    /**
+     * Get the MBeanServer
+     * 
+     * @return the MBeanServer
+     */
+    public MBeanServer getMBeanServer(){
+        if(this.beanServer==null){
+            this.beanServer=findMBeanServer();
+        }
+        return beanServer;
+    }
+
+    /**
+     * Set the MBeanServer
+     * 
+     * @param beanServer
+     */
+    public void setMBeanServer(MBeanServer beanServer){
+        this.beanServer=beanServer;
+    }
+
+    /**
+     * @return Returns the useMBeanServer.
+     */
+    public boolean isUseMBeanServer(){
+        return useMBeanServer;
+    }
+
+    /**
+     * @param useMBeanServer
+     *            The useMBeanServer to set.
+     */
+    public void setUseMBeanServer(boolean useMBeanServer){
+        this.useMBeanServer=useMBeanServer;
+    }
+
+    /**
+     * @return Returns the createMBeanServer flag.
+     */
+    public boolean isCreateMBeanServer(){
+        return createMBeanServer;
+    }
+
+    /**
+     * @param enableJMX
+     *            Set createMBeanServer.
+     */
+    public void setCreateMBeanServer(boolean enableJMX){
+        this.createMBeanServer=enableJMX;
+    }
+
+    public boolean isFindTigerMbeanServer() {
+        return findTigerMbeanServer;
+    }
+
+    /**
+     * Enables/disables the searching for the Java 5 platform MBeanServer
+     */
+    public void setFindTigerMbeanServer(boolean findTigerMbeanServer) {
+        this.findTigerMbeanServer = findTigerMbeanServer;
+    }
+
+    /**
+     * Formulate and return the MBean ObjectName of a custom control MBean
+     * 
+     * @param type
+     * @param name
+     * @return the JMX ObjectName of the MBean, or <code>null</code> if <code>customName</code> is invalid.
+     */
+    public ObjectName createCustomComponentMBeanName(String type,String name){
+        ObjectName result=null;
+        String tmp=jmxDomainName+":"+"type="+sanitizeString(type)+",name="+sanitizeString(name);
+        try{
+            result=new ObjectName(tmp);
+        }catch(MalformedObjectNameException e){
+            log.error("Couldn't create ObjectName from: "+type+" , "+name);
+        }
+        return result;
+    }
+
+    /**
+     * The ':' and '/' characters are reserved in ObjectNames
+     * 
+     * @param in
+     * @return sanitized String
+     */
+    private static String sanitizeString(String in){
+        String result=null;
+        if(in!=null){
+            result=in.replace(':','_');
+            result=result.replace('/','_');
+            result=result.replace('\\','_');
+        }
+        return result;
+    }
+
+    /**
+     * Retrive an System ObjectName
+     * 
+     * @param domainName
+     * @param containerName
+     * @param theClass
+     * @return the ObjectName
+     * @throws MalformedObjectNameException
+     */
+    public static ObjectName getSystemObjectName(String domainName,String containerName,Class theClass)
+                    throws MalformedObjectNameException,NullPointerException{
+        String tmp=domainName+":"+"type="+theClass.getName()+",name="+getRelativeName(containerName,theClass);
+        return new ObjectName(tmp);
+    }
+
+    private static String getRelativeName(String containerName,Class theClass){
+        String name=theClass.getName();
+        int index=name.lastIndexOf(".");
+        if(index>=0&&(index+1)<name.length()){
+            name=name.substring(index+1);
+        }
+        return containerName+"."+name;
+    }
+
+    /**
+     * Unregister an MBean
+     * 
+     * @param name
+     * @throws JMException
+     */
+    public void unregisterMBean(ObjectName name) throws JMException{
+        if(beanServer!=null&&beanServer.isRegistered(name)){
+            beanServer.unregisterMBean(name);
+        }
+    }
+
+    protected synchronized MBeanServer findMBeanServer(){
+        MBeanServer result=null;
+        // create the mbean server
+        try{
+            if(useMBeanServer){
+                if (findTigerMbeanServer) {
+                    result = findTigerMBeanServer();
+                }
+                if (result == null) {
+                    // lets piggy back on another MBeanServer -
+                    // we could be in an appserver!
+                    List list=MBeanServerFactory.findMBeanServer(null);
+                    if(list!=null&&list.size()>0){
+                        result=(MBeanServer) list.get(0);
+                    }
+                }
+            }
+            if(result==null&&createMBeanServer){
+                result=createMBeanServer();
+            }
+        }catch(NoClassDefFoundError e){
+            log.error("Couldnot load MBeanServer",e);
+        }catch(Throwable e){
+            // probably don't have access to system properties
+            log.error("Failed to initialize MBeanServer",e);
+        }
+        return result;
+    }
+
+    public static MBeanServer findTigerMBeanServer() {
+        String name = "java.lang.management.ManagementFactory";
+        Class type = loadClass(name, ManagementContext.class.getClassLoader());
+        if (type != null) {
+            try {
+                Method method = type.getMethod("getPlatformMBeanServer", new Class[0]);
+                if (method != null) {
+                    Object answer = method.invoke(null, new Object[0]);
+                    if (answer instanceof MBeanServer) {
+                        return (MBeanServer) answer;
+                    }
+                    else {
+                        log.warn("Could not cast: " + answer + " into an MBeanServer. There must be some classloader strangeness in town");
+                    }
+                }
+                else {
+                    log.warn("Method getPlatformMBeanServer() does not appear visible on type: " + type.getName());
+                }
+            }
+            catch (Exception e) {
+                log.warn("Failed to call getPlatformMBeanServer() due to: " + e, e);
+            }
+        }
+        else {
+            log.trace("Class not found: " + name + " so probably running on Java 1.4");
+        }
+        return null;
+    }
+
+    private static Class loadClass(String name, ClassLoader loader) {
+        try {
+            return loader.loadClass(name);
+        }
+        catch (ClassNotFoundException e) {
+            try {
+                return Thread.currentThread().getContextClassLoader().loadClass(name);
+            }
+            catch (ClassNotFoundException e1) {
+                return null;
+            }
+        }
+    }
+
+    /**
+     * @return
+     * @throws NullPointerException
+     * @throws MalformedObjectNameException
+     * @throws IOException
+     */
+    protected MBeanServer createMBeanServer() throws MalformedObjectNameException,IOException{
+        MBeanServer mbeanServer=MBeanServerFactory.createMBeanServer(jmxDomainName);
+        locallyCreateMBeanServer=true;
+        if(createConnector){
+            createConnector(mbeanServer);
+        }
+        return mbeanServer;
+    }
+
+    /**
+     * @param mbeanServer
+     * @throws MalformedObjectNameException
+     * @throws MalformedURLException
+     * @throws IOException
+     */
+    private void createConnector(MBeanServer mbeanServer) throws MalformedObjectNameException,MalformedURLException,
+                    IOException{
+        // Create the NamingService, needed by JSR 160
+        try{
+            LocateRegistry.createRegistry(connectorPort);
+            namingServiceObjectName=ObjectName.getInstance("naming:type=rmiregistry");
+            // Do not use the createMBean as the mx4j jar may not be in the
+            // same class loader than the server
+            Class cl=Class.forName("mx4j.tools.naming.NamingService");
+            mbeanServer.registerMBean(cl.newInstance(),namingServiceObjectName);
+            // mbeanServer.createMBean("mx4j.tools.naming.NamingService", namingServiceObjectName, null);
+            // set the naming port
+            Attribute attr=new Attribute("Port",new Integer(connectorPort));
+            mbeanServer.setAttribute(namingServiceObjectName,attr);
+            mbeanServer.invoke(namingServiceObjectName, "start", null, null);
+        }catch(Throwable e){
+            log.debug("Failed to create local registry",e);
+        }
+        // Create the JMXConnectorServer
+        String serviceURL="service:jmx:rmi:///jndi/rmi://localhost:"+connectorPort+connectorPath;
+        JMXServiceURL url=new JMXServiceURL(serviceURL);
+        connectorServer=JMXConnectorServerFactory.newJMXConnectorServer(url,null,mbeanServer);
+        // log.info("JMX consoles can connect to serviceURL: " + serviceURL);
+    }
+
+    public String getConnectorPath(){
+        return connectorPath;
+    }
+
+    public void setConnectorPath(String connectorPath){
+        this.connectorPath=connectorPath;
+    }
+
+    public int getConnectorPort(){
+        return connectorPort;
+    }
+
+    public void setConnectorPort(int connectorPort){
+        this.connectorPort=connectorPort;
+    }
+
+    public boolean isCreateConnector(){
+        return createConnector;
+    }
+
+    public void setCreateConnector(boolean createConnector){
+        this.createConnector=createConnector;
+    }
 }

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java Fri Jun  2 08:59:39 2006
@@ -21,6 +21,7 @@
 
 import javax.jbi.JBIException;
 import javax.jbi.component.Component;
+import javax.jbi.component.ComponentLifeCycle;
 import javax.jbi.messaging.DeliveryChannel;
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.MessageExchange;
@@ -67,6 +68,12 @@
     private long lastReceiveTime = System.currentTimeMillis();
     private AtomicBoolean closed = new AtomicBoolean(false);
     private Map waiters = new ConcurrentHashMap();
+    
+    /**
+     * When using clustering and sendSync, the exchange received will not be the same
+     * as the one sent (because it has been serialized/deserialized.
+     * We thus need to keep the original exchange in a map and override its state.
+     */
     private Map exchangesById = new ConcurrentHashMap();
 
     /**
@@ -110,16 +117,14 @@
      */
     public void close() throws MessagingException {
         if (this.closed.compareAndSet(false, true)) {
+            if (log.isDebugEnabled()) {
+                log.debug("Closing DeliveryChannel " + this);
+            }
             List pending = queue.closeAndFlush();
             for (Iterator iter = pending.iterator(); iter.hasNext();) {
                 MessageExchangeImpl messageExchange = (MessageExchangeImpl) iter.next();
                 if (messageExchange.getTransactionContext() != null && messageExchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
-                    synchronized (messageExchange.getMirror()) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("Notifying: " + messageExchange.getExchangeId());
-                        }
-                        messageExchange.getMirror().notify();
-                    }
+                    notifyExchange(messageExchange.getMirror(), messageExchange.getMirror(), "close");
                 }
             }
             // Interrupt all blocked thread
@@ -143,7 +148,7 @@
 
     protected void checkNotClosed() throws MessagingException {
         if (closed.get()) {
-            throw new MessagingException("DeliveryChannel has been closed.");
+            throw new MessagingException(this + " has been closed.");
         }
     }
 
@@ -244,25 +249,7 @@
      * @throws MessagingException
      */
     public MessageExchange accept() throws MessagingException {
-        try {
-            checkNotClosed();
-            MessageExchangeImpl me = (MessageExchangeImpl) queue.take();
-            if (log.isDebugEnabled()) {
-                log.debug("Accepting " + me.getExchangeId() + " in " + this);
-            }
-            resumeTx(me);
-            me.handleAccept();
-            if (log.isTraceEnabled()) {
-                log.trace("Accepted: " + me);
-            }
-            return me;
-        }
-        catch (IllegalStateException e) {
-            throw new MessagingException("DeliveryChannel has been closed.");
-        }
-        catch (InterruptedException e) {
-            throw new MessagingException("accept failed", e);
-        }
+        return accept(Long.MAX_VALUE);
     }
 
     /**
@@ -288,10 +275,29 @@
                     if (log.isDebugEnabled()) {
                         log.debug("Accepting " + me.getExchangeId() + " in " + this);
                     }
-                    resumeTx(me);
-                    me.handleAccept();
-                    if (log.isTraceEnabled()) {
-                        log.trace("Accepted: " + me);
+                    // If we have a tx lock and the exchange is not active, we need
+                    // to notify here without resuming transaction
+                    if (me.getTxLock() != null && me.getStatus() != ExchangeStatus.ACTIVE) {
+                        notifyExchange(me.getMirror(), me.getTxLock(), "acceptFinishedExchangeWithTxLock");
+                        me.handleAccept();
+                        if (log.isTraceEnabled()) {
+                            log.trace("Accepted: " + me);
+                        }
+                    }
+                    // We transactionnaly deliver a finished exchange
+                    else if (me.isTransacted() && me.getStatus() != ExchangeStatus.ACTIVE) {
+                        // Do not resume transaction
+                        me.handleAccept();
+                        if (log.isTraceEnabled()) {
+                            log.trace("Accepted: " + me);
+                        }
+                    } 
+                    else {
+                        resumeTx(me);
+                        me.handleAccept();
+                        if (log.isTraceEnabled()) {
+                            log.trace("Accepted: " + me);
+                        }
                     }
                 }
             }
@@ -301,101 +307,86 @@
             throw new MessagingException("accept failed", e);
         }
     }
+    
+    protected void autoSetPersistent(MessageExchangeImpl me) {
+        Boolean persistent = me.getPersistent();
+        if (persistent == null) {
+            if (context.getActivationSpec().getPersistent() != null) {
+                persistent = context.getActivationSpec().getPersistent();
+            } else {
+                persistent = Boolean.valueOf(context.getContainer().isPersistent());
+            }
+            me.setPersistent(persistent);
+        }
+    }
+    
+    protected void throttle() {
+        if (component.isExchangeThrottling()) {
+            if (component.getThrottlingInterval() > intervalCount) {
+                intervalCount = 0;
+                try {
+                    Thread.sleep(component.getThrottlingTimeout());
+                }
+                catch (InterruptedException e) {
+                    log.warn("throttling failed", e);
+                }
+            }
+            intervalCount++;
+        }
+    }
 
-    protected void doSend(MessageExchangeImpl messageExchange, boolean sync) throws MessagingException {
+    protected void doSend(MessageExchangeImpl me, boolean sync) throws MessagingException {
+        MessageExchangeImpl mirror = me.getMirror();
+        boolean finished = me.getStatus() != ExchangeStatus.ACTIVE;
         try {
             if (log.isTraceEnabled()) {
-                log.trace("Sent: " + messageExchange);
+                log.trace("Sent: " + me);
             }
-            // If the delivery channel has been closed
-            checkNotClosed();
             // If the message has timed out
-            if (messageExchange.getPacket().isAborted()) {
-                throw new ExchangeTimeoutException(messageExchange);
+            if (me.getPacket().isAborted()) {
+                throw new ExchangeTimeoutException(me);
             }
             // Auto enlist exchange in transaction
-            autoEnlistInTx(messageExchange);
+            autoEnlistInTx(me);
             // Update persistence info
-            Boolean persistent = messageExchange.getPersistent();
-            if (persistent == null) {
-                if (context.getActivationSpec().getPersistent() != null) {
-                    persistent = context.getActivationSpec().getPersistent();
-                } else {
-                    persistent = Boolean.valueOf(context.getContainer().isPersistent());
-                }
-                messageExchange.setPersistent(persistent);
-            }
-
-            if (component.isExchangeThrottling()) {
-                if (component.getThrottlingInterval() > intervalCount) {
-                    intervalCount = 0;
-                    try {
-                        Thread.sleep(component.getThrottlingTimeout());
-                    }
-                    catch (InterruptedException e) {
-                        log.warn("throttling failed", e);
-                    }
-                }
-                intervalCount++;
-            }
-
+            autoSetPersistent(me);
+            // Throttle if needed
+            throttle();
             // Update stats
-            MessagingStats messagingStats = component.getMessagingStats();
-            long currentTime = System.currentTimeMillis();
-            if (container.isNotifyStatistics()) {
-                long oldCount = messagingStats.getOutboundExchanges().getCount();
-                messagingStats.getOutboundExchanges().increment();
-                component.firePropertyChanged(
-                        "outboundExchangeCount",
-                        new Long(oldCount),
-                        new Long(messagingStats.getOutboundExchanges().getCount()));
-                double oldRate = messagingStats.getOutboundExchangeRate().getAverageTime();
-                messagingStats.getOutboundExchangeRate().addTime(currentTime - lastSendTime);
-                component.firePropertyChanged("outboundExchangeRate",
-                        new Double(oldRate),
-                        new Double(messagingStats.getOutboundExchangeRate().getAverageTime()));
-            } else {
-                messagingStats.getOutboundExchanges().increment();
-                messagingStats.getOutboundExchangeRate().addTime(currentTime - lastSendTime);
+            incrementOutboundStats();
+            // Store the consumer component
+            if (me.getRole() == Role.CONSUMER) {
+                me.setSourceId(component.getComponentNameSpace());
             }
-            lastSendTime = currentTime;
-
-            if (messageExchange.getRole() == Role.CONSUMER) {
-                messageExchange.setSourceId(component.getComponentNameSpace());
-            }
-
             // Call the listeners before the ownership changes
-            container.callListeners(messageExchange);
-            messageExchange.handleSend(sync);
-            container.sendExchange(messageExchange.getMirror());
+            container.callListeners(me);
+            me.handleSend(sync);
+            mirror.setTxState(MessageExchangeImpl.TX_STATE_NONE);
+            // If this is the DONE or ERROR status from a synchronous transactional exchange,
+            // it should not be part of the transaction, so remove the tx context
+            if (finished && 
+                me.getTxLock() == null &&
+                me.getTxState() == MessageExchangeImpl.TX_STATE_CONVEYED &&
+                me.isPushDelivery() == false) {
+                me.setTransactionContext(null);
+            }
+            container.sendExchange(mirror);
         } catch (MessagingException e) {
             if (log.isDebugEnabled()) {
-                log.debug("Exception processing: " + messageExchange.getExchangeId() + " in " + this);
+                log.debug("Exception processing: " + me.getExchangeId() + " in " + this);
             }
             throw e;
         } finally {
-            if (messageExchange.getTransactionContext() != null) {
-                if (messageExchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
-                    suspendTx(messageExchange);
-                    if (log.isDebugEnabled()) {
-                        log.debug("Notifying: " + messageExchange.getExchangeId() + " in " + this);
-                    }
-                    synchronized (messageExchange.getMirror()) {
-                        messageExchange.getMirror().notify();
-                    }
+            // If there is a tx lock, we need to suspend and notify
+            if (me.getTxLock() != null) {
+                if (mirror.getTxState() == MessageExchangeImpl.TX_STATE_ENLISTED) {
+                    suspendTx(mirror);
+                }
+                synchronized (me.getTxLock()) {
+                    notifyExchange(me, me.getTxLock(), "doSendWithTxLock");
                 }
             }
         }
-
-        /*
-        if (messageExchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
-            synchronized (messageExchange.getMirror()) {
-                suspendTx(messageExchange);
-                messageExchange.getMirror().setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED);
-                messageExchange.getMirror().notify();
-            }
-        }
-        */
     }
 
     /**
@@ -405,9 +396,17 @@
      * @throws MessagingException
      */
     public void send(MessageExchange messageExchange) throws MessagingException {
+        // If the delivery channel has been closed
+        checkNotClosed();
+        // Log call
+        if (log.isDebugEnabled()) {
+            log.debug("Send " + messageExchange.getExchangeId() + " in " + this);
+        }
+        // // JBI 5.5.2.1.3: remove sync property
         messageExchange.setProperty(JbiConstants.SEND_SYNC, null);
-        MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
-        doSend(messageExchangeImpl, false);
+        // Call doSend
+        MessageExchangeImpl me = (MessageExchangeImpl) messageExchange;
+        doSend(me, false);
     }
 
     /**
@@ -418,7 +417,7 @@
      * @throws MessagingException
      */
     public boolean sendSync(MessageExchange messageExchange) throws MessagingException {
-        return sendSync(messageExchange, Long.MAX_VALUE);
+        return sendSync(messageExchange, 0);
     }
 
     /**
@@ -429,42 +428,55 @@
      * @return true if processed
      * @throws MessagingException
      */
-    public boolean sendSync(MessageExchange messageExchange, long timeoutMS) throws MessagingException {
-        boolean result = false;
+    public boolean sendSync(MessageExchange messageExchange, long timeout) throws MessagingException {
+        // If the delivery channel has been closed
+        checkNotClosed();
+        // Log call
         if (log.isDebugEnabled()) {
-            log.debug("Sending " + messageExchange.getExchangeId() + " in " + this);
+            log.debug("SendSync " + messageExchange.getExchangeId() + " in " + this);
         }
+        boolean result = false;
         // JBI 5.5.2.1.3: set the sendSync property
         messageExchange.setProperty(JbiConstants.SEND_SYNC, Boolean.TRUE);
-        MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
-        exchangesById.put(messageExchange.getExchangeId(), messageExchange);
+        // Call doSend
+        MessageExchangeImpl me = (MessageExchangeImpl) messageExchange;
         try {
+            exchangesById.put(me.getExchangeId(), me);
             // Synchronously send a message and wait for the response
-            synchronized (messageExchangeImpl) {
-                doSend(messageExchangeImpl, true);
-                if (messageExchangeImpl.getSyncState() != MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED) {
-                    waiters.put(Thread.currentThread(), Boolean.TRUE);
-                    try {
-                        messageExchangeImpl.wait(timeoutMS);
-                    } finally {
-                        waiters.remove(Thread.currentThread());
+            synchronized (me) {
+                doSend(me, true);
+                if (me.getSyncState() != MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED) {
+                    waitForExchange(me, me, timeout, "sendSync");
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Exchange " + messageExchange.getExchangeId() + " has already been answered (no need to wait)");
                     }
                 }
             }
-            if (messageExchangeImpl.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED) {
-                messageExchangeImpl.handleAccept();
-                resumeTx(messageExchangeImpl);
-                result= true;
+            if (me.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED) {
+                me.handleAccept();
+                // If the sender flag has been removed, it means
+                // the message has been delivered in the same thread
+                // so there is no need to resume the transaction
+                // See processInBound
+                //if (messageExchangeImpl.getSyncSenderThread() != null) {
+                    resumeTx(me);
+                //}
+                result = true;
             } else {
                 // JBI 5.5.2.1.3: the exchange should be set to ERROR status
-                messageExchangeImpl.getPacket().setAborted(true);
-                result =  false;
+                if (log.isDebugEnabled()) {
+                    log.debug("Exchange " + messageExchange.getExchangeId() + " has been aborted");
+                }
+                me.getPacket().setAborted(true);
+                result = false;
             }
         } catch (InterruptedException e) {
-            exchangesById.remove(messageExchange.getExchangeId());
             throw new MessagingException(e);
-        }
-        finally{
+        } catch (RuntimeException e) {
+            e.printStackTrace();
+            throw e;
+        } finally {
             exchangesById.remove(messageExchange.getExchangeId());
         }
         return result;
@@ -509,16 +521,7 @@
         this.context = context;
     }
 
-    /**
-     * Used internally for passing in a MessageExchange
-     * 
-     * @param me
-     * @throws MessagingException
-     */
-    public void processInBound(MessageExchangeImpl me) throws MessagingException {
-        checkNotClosed();
-
-        // Update stats
+    protected void incrementInboundStats() {
         MessagingStats messagingStats = component.getMessagingStats();
         long currentTime = System.currentTimeMillis();
         if (container.isNotifyStatistics()) {
@@ -538,69 +541,179 @@
             messagingStats.getInboundExchangeRate().addTime(currentTime - lastReceiveTime);
         }
         lastReceiveTime = currentTime;
+    }
+    
+    protected void incrementOutboundStats() {
+        MessagingStats messagingStats = component.getMessagingStats();
+        long currentTime = System.currentTimeMillis();
+        if (container.isNotifyStatistics()) {
+            long oldCount = messagingStats.getOutboundExchanges().getCount();
+            messagingStats.getOutboundExchanges().increment();
+            component.firePropertyChanged(
+                    "outboundExchangeCount",
+                    new Long(oldCount),
+                    new Long(messagingStats.getOutboundExchanges().getCount()));
+            double oldRate = messagingStats.getOutboundExchangeRate().getAverageTime();
+            messagingStats.getOutboundExchangeRate().addTime(currentTime - lastSendTime);
+            component.firePropertyChanged("outboundExchangeRate",
+                    new Double(oldRate),
+                    new Double(messagingStats.getOutboundExchangeRate().getAverageTime()));
+        } else {
+            messagingStats.getOutboundExchanges().increment();
+            messagingStats.getOutboundExchangeRate().addTime(currentTime - lastSendTime);
+        }
+        lastSendTime = currentTime;
+    }
+    
+    /**
+     * Used internally for passing in a MessageExchange
+     * 
+     * @param me
+     * @throws MessagingException
+     */
+    public void processInBound(MessageExchangeImpl me) throws MessagingException {
+        if (log.isTraceEnabled()) {
+            log.trace("Processing inbound exchange: " + me);
+        }
+        // Check if the delivery channel has been closed
+        checkNotClosed();
+        // Update stats
+        incrementInboundStats();
 
-        // If the message has been sent synchronously
-        // this is the answer, so update the syncState and notify the waiter
-        // Here, we don't need to put the message in the queue
-        MessageExchangeImpl theOriginal = (MessageExchangeImpl) exchangesById.get(me.getExchangeId());
-        if (theOriginal != null && theOriginal.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT &&
-            theOriginal.getRole() == me.getRole()) {
-            suspendTx(theOriginal);
-            synchronized (theOriginal) {
-                theOriginal.copyFrom(me);
-                theOriginal.setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED);
-                theOriginal.notify();
+        // Retrieve the original exchange sent
+        MessageExchangeImpl original = (MessageExchangeImpl) exchangesById.get(me.getExchangeId());
+        if (original != null && me != original) {
+            original.copyFrom(me);
+            me = original;
+        }
+        // Check if the incoming exchange is a response to a synchronous exchange previously sent
+        // In this case, we do not have to queue it, but rather notify the waiting thread.
+        if (me.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
+            // If the mirror has been delivered using push, better wait until
+            // the push call return.  This can only work if not using clustered flows,
+            // but the flag is transient so we do not care.
+            /*if (!me.getMirror().isPushDelivery())*/ {
+                // Ensure that data is uptodate with the incoming exchange (in case the exchange has
+                // been serialized / deserialized by a clustered flow)
+                suspendTx(original);
+                me.setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED);
+                notifyExchange(original, original, "processInboundSynchronousExchange");
+            }
+            return;
+        }
+
+        // If the component implements the MessageExchangeListener,
+        // the delivery can be made synchronously, so we don't need
+        // to bother with transactions
+        MessageExchangeListener listener = getExchangeListener();
+        if (listener != null) {
+            me.handleAccept();
+            if (log.isTraceEnabled()) {
+                log.trace("Received: " + me);
             }
-        } else {
-            Component component = this.component.getComponent();
-            // If the component implements the MessageExchangeListener,
-            // the delivery can be made synchronously, so we don't need
-            // to bother about transactions
-            if (component != null && component instanceof MessageExchangeListener) {
-                me.handleAccept();
-                if (log.isTraceEnabled()) {
-                    log.trace("Received: " + me);
+            // Set the flag the the exchange was delivered using push mode
+            // This is important for transaction boundaries
+            me.setPushDeliver(true);
+            // Deliver the exchange
+            listener.onMessageExchange(me);
+            // TODO: handle delayed exchange notifications 
+            return;
+        }
+        
+        // Component uses pull delivery.
+        
+        // If the exchange is transacted, special care should be taken.
+        // But if the exchange is no more ACTIVE, just queue it, as
+        // we will never have an answer back.
+        if (me.isTransacted() && me.getStatus() == ExchangeStatus.ACTIVE) {
+            // If the transaction is conveyed by the exchange
+            // We do not need to resume the transaction in this thread 
+            if (me.getTxState() == MessageExchangeImpl.TX_STATE_CONVEYED) {
+                try {
+                    suspendTx(me);
+                    queue.put(me);
+                } catch (InterruptedException e) {
+                    log.debug("Exchange " + me.getExchangeId() + " aborted due to thread interruption", e);
+                    me.getPacket().setAborted(true);
                 }
-                ((MessageExchangeListener) component).onMessageExchange(me);
             }
+            // Else the delivery / send are enlisted in the current tx.
+            // We must suspend the transaction, queue it, and wait for the answer
+            // to be sent, at which time the tx should be suspended and resumed in
+            // this thread.
             else {
-                // Component uses async delivery
-                try {
-                    if (me.isTransacted() && me.getStatus() == ExchangeStatus.DONE) {
-                        // Do nothing in this case
-                    } else if (me.isTransacted() && me.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_ASYNC) {
-                        suspendTx(me);
-                        synchronized (me.getMirror()) {
-                            me.getMirror().setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_SENT);
-                            if (log.isDebugEnabled()) {
-                                log.debug("Queuing: " + me.getExchangeId() + " in " + this);
-                            }
-                            queue.put(me);
-                            if (log.isDebugEnabled()) {
-                                log.debug("Waiting: " + me.getExchangeId() + " in " + this);
-                            }
-                            // If the channel is closed while here,
-                            // we must abort
-                            waiters.put(Thread.currentThread(), Boolean.TRUE);
-                            try {
-                                me.getMirror().wait();
-                            } finally {
-                                waiters.remove(Thread.currentThread());
-                            }
-                            if (log.isDebugEnabled()) {
-                                log.debug("Notified: " + me.getExchangeId() + " in " + this);
-                            }
-                        }
-                        resumeTx(me);
-                    } else {
+                Object lock = new Object();
+                synchronized (lock) {
+                    try {
+                        me.setTxLock(lock);
                         suspendTx(me);
                         queue.put(me);
+                        waitForExchange(me, lock, 0, "processInboundTransactionalExchange");
+                    } catch (InterruptedException e) {
+                        log.debug("Exchange " + me.getExchangeId() + " aborted due to thread interruption", e);
+                        me.getPacket().setAborted(true);
+                    } finally {
+                        me.setTxLock(null);
+                        resumeTx(me);
                     }
-                } catch (InterruptedException e) {
-                    throw new MessagingException(e);
                 }
             }
         }
+        // If the exchange is ACTIVE, the transaction boundary will suspended when the 
+        // answer is sent
+        // Else just queue the exchange
+        else {
+            try {
+                queue.put(me);
+            } catch (InterruptedException e) {
+                log.debug("Exchange " + me.getExchangeId() + " aborted due to thread interruption", e);
+                me.getPacket().setAborted(true);
+            }
+        }
+    }
+    
+    protected MessageExchangeListener getExchangeListener() {
+        Component component = this.component.getComponent();
+        if (component instanceof MessageExchangeListener) {
+            return (MessageExchangeListener) component;
+        }
+        ComponentLifeCycle lifecycle = this.component.getLifeCycle();
+        if (lifecycle instanceof MessageExchangeListener) {
+            return (MessageExchangeListener) lifecycle;
+        }
+        return null;
+    }
+    
+    /**
+     * Synchronization must be performed on the given exchange when calling this method
+     * 
+     * @param me
+     * @throws InterruptedException
+     */
+    protected void waitForExchange(MessageExchangeImpl me, Object lock, long timeout, String from) throws InterruptedException {
+        // If the channel is closed while here, we must abort
+        if (log.isDebugEnabled()) {
+            log.debug("Waiting for exchange " + me.getExchangeId() + " (" + Integer.toHexString(me.hashCode()) + ") to be answered in " + this + " from " + from);
+        }
+        Thread th = Thread.currentThread();
+        try {
+            waiters.put(th, Boolean.TRUE);
+            lock.wait(timeout);
+        } finally {
+            waiters.remove(th);
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Notified: " + me.getExchangeId() + "(" + Integer.toHexString(me.hashCode()) + ") in " + this + " from " + from);
+        }
+    }
+    
+    protected void notifyExchange(MessageExchangeImpl me, Object lock, String from) {
+        if (log.isDebugEnabled()) {
+            log.debug("Notifying exchange " + me.getExchangeId() + "(" + Integer.toHexString(me.hashCode()) + ") in " + this + " from " + from);
+        }
+        synchronized (lock) {
+            lock.notify();
+        }
     }
 
     /**
@@ -615,7 +728,7 @@
         return inboundFactory;
     }
 
-    protected void suspendTx(MessageExchangeImpl me) throws MessagingException {
+    protected void suspendTx(MessageExchangeImpl me) {
         try {
             Transaction oldTx = me.getTransactionContext();
             if (oldTx != null) {
@@ -631,7 +744,8 @@
                 }
             }
         } catch (Exception e) {
-            throw new MessagingException(e);
+            log.info("Exchange " + me.getExchangeId() + " aborted due to transaction exception", e);
+            me.getPacket().setAborted(true);
         }
     }
 

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeImpl.java?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeImpl.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeImpl.java Fri Jun  2 08:59:39 2006
@@ -53,6 +53,23 @@
     public static final int SYNC_STATE_ASYNC = 0;
     public static final int SYNC_STATE_SYNC_SENT = 1;
     public static final int SYNC_STATE_SYNC_RECEIVED = 2;
+    
+    /**
+     * Exchange is not transactional 
+     */
+    public static final int TX_STATE_NONE = 0;
+    /**
+     * Exchange has been enlisted in the current transaction.
+     * This means that the transaction must be commited for
+     * the exchange to be delivered.
+     */
+    public static final int TX_STATE_ENLISTED = 1;
+    /**
+     * Transaction is being conveyed by the exchange.
+     * The transaction context will be given to the
+     * target component.
+     */
+    public static final int TX_STATE_CONVEYED = 2;
 
     protected static final int CAN_SET_IN_MSG               = 0x00000001;
     protected static final int CAN_SET_OUT_MSG              = 0x00000002;
@@ -84,8 +101,11 @@
     protected PojoMarshaler marshaler;
     protected int state;
     protected int syncState = SYNC_STATE_ASYNC;
+    protected int txState = TX_STATE_NONE;
     protected int[][] states;
     protected MessageExchangeImpl mirror;
+    protected transient boolean pushDeliver;
+    protected transient Object txLock;
 
     /**
      * Constructor
@@ -108,10 +128,12 @@
     }
     
     protected void copyFrom(MessageExchangeImpl me) {
-        this.packet = me.packet;
-        this.state = me.state;
-        this.mirror.packet = me.packet;
-        this.mirror.state = me.mirror.state;
+        if (this != me) {
+            this.packet = me.packet;
+            this.state = me.state;
+            this.mirror.packet = me.packet;
+            this.mirror.state = me.mirror.state;
+        }
     }
     
     protected boolean can(int c) {
@@ -599,18 +621,54 @@
         this.state = nextState;
     }
 
-    public int getSyncState() {
-        return syncState;
-    }
-
     public MessageExchangeImpl getMirror() {
         return mirror;
     }
 
+    public int getSyncState() {
+        return syncState;
+    }
+
     public void setSyncState(int syncState) {
         this.syncState = syncState;
     }
     
+    /**
+     * @return the txState
+     */
+    public int getTxState() {
+        return txState;
+    }
+
+    /**
+     * @param txState the txState to set
+     */
+    public void setTxState(int txState) {
+        this.txState = txState;
+    }
+
+    public boolean isPushDelivery() {
+        return this.pushDeliver;
+    }
+    
+    public void setPushDeliver(boolean b) {
+        this.pushDeliver = true;
+    }
+    
+    /**
+     * @return the txLock
+     */
+    public Object getTxLock() {
+        return txLock;
+    }
+
+    /**
+     * @param txLock the txLock to set
+     */
+    public void setTxLock(Object txLock) {
+        this.txLock = txLock;
+    }
+
     public String toString() {
         try {
             int maxSize = 1500;
@@ -685,5 +743,5 @@
             return null;
         }
     }
-    
+
 }

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/DefaultBroker.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/DefaultBroker.java?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/DefaultBroker.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/DefaultBroker.java Fri Jun  2 08:59:39 2006
@@ -131,11 +131,11 @@
             flows = new Flow[names.length];
             for (int i = 0; i < names.length; i++) {
                 flows[i] = FlowProvider.getFlow(names[i]);
-                flows[i].init(this, names[i]);
+                flows[i].init(this);
             }
         } else {
             for (int i = 0; i < flows.length; i++) {
-                flows[i].init(this, null);
+                flows[i].init(this);
             }
         }
     	subscriptionManager.init(this, registry);

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java Fri Jun  2 08:59:39 2006
@@ -60,9 +60,8 @@
      * @param broker
      * @throws JBIException
      */
-    public void init(Broker broker, String name) throws JBIException {
+    public void init(Broker broker) throws JBIException {
         this.broker = broker;
-        this.name = name;
 		// register self with the management context
         ObjectName objectName = broker.getContainer().getManagementContext().createObjectName(this);
         try {
@@ -160,9 +159,7 @@
      * @param packet
      * @throws MessagingException
      */
-    public void doRouting(MessageExchangeImpl me) throws MessagingException {
-    	if (log.isDebugEnabled())
-    		log.debug("Called Flow doRouting");
+    protected void doRouting(MessageExchangeImpl me) throws MessagingException {
         ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId() : me.getSourceId();
         //As the MessageExchange could come from another container - ensure we get the local Component
         ComponentMBeanImpl lcc = broker.getContainer().getRegistry().getComponent(id.getName());
@@ -256,6 +253,10 @@
         } else {
             return this.name;
         }
+    }
+    
+    public void setName(String name) {
+        this.name = name;
     }
     
 }

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java Fri Jun  2 08:59:39 2006
@@ -16,36 +16,39 @@
 package org.apache.servicemix.jbi.nmr.flow;
 
 import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.JbiConstants;
+import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
 
 public class DefaultFlowChooser implements FlowChooser {
 
-    private static final Log log = LogFactory.getLog(DefaultFlowChooser.class); 
-    
     public DefaultFlowChooser() {
     }
 
-    public Flow chooseFlow(Flow[] flows, MessageExchange exchange) {
+    public Flow chooseFlow(Flow[] flows, MessageExchange exchange) throws MessagingException {
         // Check if flow was specified
         String flow = (String) exchange.getProperty(JbiConstants.FLOW_PROPERTY_NAME);
         if (flow != null) {
+            Flow foundFlow = null;
             for (int i = 0; i < flows.length; i++) {
-                if (flows[i].getName().equals(flow)) {
-                    if (flows[i].canHandle(exchange)) {
-                        return flows[i];
-                    } else {
-                        log.debug("Flow '" + flow + "' was specified but not able to handle exchange");
-                    }
+                if (flows[i].getName().equalsIgnoreCase(flow)) {
+                    foundFlow = flows[i];
+                    break;
                 }
             }
-            log.debug("Flow '" + flow + "' was specified but not found");
+            if (foundFlow == null) {
+                throw new MessagingException("Flow '" + flow + "' was specified but not found");
+            } if (foundFlow.canHandle(exchange)) {
+                return foundFlow;
+            } else {
+                throw new MessagingException("Flow '" + flow + "' was specified but not able to handle exchange");
+            }
         }
         // Check against flow capabilities
         for (int i = 0; i < flows.length; i++) {
             if (flows[i].canHandle(exchange)) {
+                ((MessageExchangeImpl) exchange).getPacket().setProperty(JbiConstants.FLOW_PROPERTY_NAME, flows[i].getName());
                 return flows[i];
             }
         }

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java Fri Jun  2 08:59:39 2006
@@ -33,7 +33,7 @@
      * @param broker
      * @throws JBIException
      */
-    public void init(Broker broker, String name) throws JBIException;
+    public void init(Broker broker) throws JBIException;
     
     /**
      * The description of Flow

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/FlowChooser.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/FlowChooser.java?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/FlowChooser.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/FlowChooser.java Fri Jun  2 08:59:39 2006
@@ -16,9 +16,18 @@
 package org.apache.servicemix.jbi.nmr.flow;
 
 import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
 
 public interface FlowChooser {
 
-    Flow chooseFlow(Flow[] flows, MessageExchange exchange);
+    /**
+     * Choose a flow amongst the available flows.
+     * 
+     * @param flows
+     * @param exchange
+     * @return the selected flow (must be non-null)
+     * @throws MessagingException if a flow can not be chosen
+     */
+    Flow chooseFlow(Flow[] flows, MessageExchange exchange) throws MessagingException;
     
 }

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/FlowProvider.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/FlowProvider.java?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/FlowProvider.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/FlowProvider.java Fri Jun  2 08:59:39 2006
@@ -75,7 +75,7 @@
         }
     }
 
-    protected static String getFlowName(String str){
+    public static String getFlowName(String str){
         String result=str;
         int index=str.indexOf('?');
         if(index>=0){

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java Fri Jun  2 08:59:39 2006
@@ -40,6 +40,7 @@
 import javax.resource.spi.ConnectionManager;
 import javax.resource.spi.ResourceAdapter;
 import javax.resource.spi.ResourceAdapterInternalException;
+import javax.transaction.Status;
 import javax.transaction.SystemException;
 import javax.transaction.TransactionManager;
 
@@ -229,9 +230,9 @@
      * @param broker
      * @throws JBIException
      */
-    public void init(Broker broker, String subType) throws JBIException {
+    public void init(Broker broker) throws JBIException {
         log.info(broker.getContainer().getName() + ": Initializing jca flow");
-        super.init(broker, subType);
+        super.init(broker);
         // Create and register endpoint listener
         endpointListener = new EndpointAdapter() {
             public void internalEndpointRegistered(EndpointEvent event) {
@@ -310,10 +311,13 @@
                             Object obj = ((ObjectMessage) message).getObject();
                             if (obj instanceof EndpointEvent) {
                                 EndpointEvent event = (EndpointEvent) obj;
-                                if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_REGISTERED) {
-                                    onRemoteEndpointRegistered(event);
-                                } else if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED) {
-                                    onRemoteEndpointUnregistered(event);
+                                String container = ((InternalEndpoint) event.getEndpoint()).getComponentNameSpace().getContainerName();
+                                if (!getBroker().getContainer().getName().equals(container)) {
+                                    if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_REGISTERED) {
+                                        onRemoteEndpointRegistered(event);
+                                    } else if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED) {
+                                        onRemoteEndpointUnregistered(event);
+                                    }
                                 }
                             }
                         } catch (Exception e) {
@@ -550,10 +554,16 @@
                     destination = INBOUND_PREFIX + me.getSourceId().getContainerName();
                 }
             }
+            if (me.isTransacted()) {
+                me.setTxState(MessageExchangeImpl.TX_STATE_ENLISTED);
+            }
             sendJmsMessage(new ActiveMQQueue(destination), me, isPersistent(me), me.isTransacted());
         } catch (JMSException e) {
             log.error("Failed to send exchange: " + me + " internal JMS Network", e);
             throw new MessagingException(e);
+        } catch (SystemException e) {
+            log.error("Failed to send exchange: " + me + " transaction problem", e);
+            throw new MessagingException(e);
         }
     }
 
@@ -679,7 +689,13 @@
         return broker.getContainer().getName() + " JCAFlow";
     }
     
-    private void sendJmsMessage(Destination dest, Serializable object, boolean persistent, boolean transacted) throws JMSException {
+    private void sendJmsMessage(Destination dest, Serializable object, boolean persistent, boolean transacted) throws JMSException, SystemException {
+        if (transacted) {
+            TransactionManager tm = (TransactionManager) getBroker().getContainer().getTransactionManager();
+            if (tm.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
+                return;
+            }
+        }
     	Connection connection = connectionFactory.createConnection();
     	try {
     		Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java Fri Jun  2 08:59:39 2006
@@ -205,9 +205,9 @@
      * @param broker
      * @throws JBIException
      */
-    public void init(Broker broker, String subType) throws JBIException {
+    public void init(Broker broker) throws JBIException {
         log.info(broker.getContainer().getName() + ": Initializing jms flow");
-        super.init(broker, subType);
+        super.init(broker);
         // Create and register endpoint listener
         endpointListener = new EndpointAdapter() {
             public void internalEndpointRegistered(EndpointEvent event) {
@@ -276,10 +276,13 @@
                             Object obj = ((ObjectMessage) message).getObject();
                             if (obj instanceof EndpointEvent) {
                                 EndpointEvent event = (EndpointEvent) obj;
-                                if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_REGISTERED) {
-                                    onRemoteEndpointRegistered(event);
-                                } else if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED) {
-                                    onRemoteEndpointUnregistered(event);
+                                String container = ((InternalEndpoint) event.getEndpoint()).getComponentNameSpace().getContainerName();
+                                if (!getBroker().getContainer().getName().equals(container)) {
+                                    if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_REGISTERED) {
+                                        onRemoteEndpointRegistered(event);
+                                    } else if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED) {
+                                        onRemoteEndpointUnregistered(event);
+                                    }
                                 }
                             }
                         } catch (Exception e) {

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java Fri Jun  2 08:59:39 2006
@@ -20,11 +20,14 @@
 
 import javax.jbi.JBIException;
 import javax.jbi.management.LifeCycleMBean;
+import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
 import javax.management.JMException;
 import javax.management.MBeanAttributeInfo;
 import javax.management.ObjectName;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
 
 import org.apache.servicemix.jbi.event.ComponentAdapter;
 import org.apache.servicemix.jbi.event.ComponentEvent;
@@ -69,8 +72,8 @@
      * @param broker
      * @throws JBIException
      */
-    public void init(Broker broker, String subType) throws JBIException {
-        super.init(broker, subType);
+    public void init(Broker broker) throws JBIException {
+        super.init(broker);
         listener = new ComponentAdapter() {
             public void componentShutDown(ComponentEvent event) {
                 onComponentShutdown(event.getComponent().getComponentNameSpace());
@@ -85,7 +88,21 @@
      * @return true if this flow can handle the given exchange
      */
     public boolean canHandle(MessageExchange me) {
-        return !isPersistent(me) && !isClustered(me);
+        if (isPersistent(me)) {
+            return false;
+        }
+        if (isClustered(me)) {
+            return false;
+        }
+        if (isTransacted(me)) {
+            if (!isSynchronous(me)) {
+                // we have the mirror, so the role is the one for the target component
+                if (me.getStatus() == ExchangeStatus.ACTIVE) {
+                    return false;
+                }
+            }
+        }
+        return true;
     }
     
     /**
@@ -146,14 +163,16 @@
         if (me.getDestinationId() == null) {
             me.setDestinationId(((AbstractServiceEndpoint) me.getEndpoint()).getComponentNameSpace());
         }
-        if (me.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME) == null &&
-            me.getSyncState() == MessageExchangeImpl.SYNC_STATE_ASYNC &&
-            me.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_ASYNC) {
-        	enqueuePacket(me);
-        }
-        else {
-            doRouting(me);
+        if (isTransacted(me)) {
+            me.setTxState(MessageExchangeImpl.TX_STATE_CONVEYED);
         }
+        suspendTx(me);
+        enqueuePacket(me);
+    }
+    
+    protected void doRouting(MessageExchangeImpl me) throws MessagingException {
+        resumeTx(me);
+        super.doRouting(me);
     }
 
     /**
@@ -269,4 +288,42 @@
         helper.addAttribute(getObjectToManage(), "queueNumber", "number of running SedaQueues");
         return AttributeInfoHelper.join(super.getAttributeInfos(), helper.getAttributeInfos());
     }
+
+    protected void suspendTx(MessageExchangeImpl me) throws MessagingException {
+        try {
+            Transaction oldTx = me.getTransactionContext();
+            if (oldTx != null) {
+                TransactionManager tm = (TransactionManager) getBroker().getContainer().getTransactionManager();
+                if (tm != null) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Suspending transaction for " + me.getExchangeId() + " in " + this);
+                    }
+                    Transaction tx = tm.suspend();
+                    if (tx != oldTx) {
+                        throw new IllegalStateException("the transaction context set in the messageExchange is not bound to the current thread");
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new MessagingException(e);
+        }
+    }
+
+    protected void resumeTx(MessageExchangeImpl me) throws MessagingException {
+        try {
+            Transaction oldTx = me.getTransactionContext();
+            if (oldTx != null) {
+                TransactionManager tm = (TransactionManager) getBroker().getContainer().getTransactionManager();
+                if (tm != null) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Resuming transaction for " + me.getExchangeId() + " in " + this);
+                    }
+                    tm.resume(oldTx);
+                }
+            }
+        } catch (Exception e) {
+            throw new MessagingException(e);
+        }
+    }
+
 }

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaQueue.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaQueue.java?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaQueue.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaQueue.java Fri Jun  2 08:59:39 2006
@@ -27,11 +27,14 @@
 
 import javax.jbi.JBIException;
 import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
 import javax.management.JMException;
 import javax.management.MBeanAttributeInfo;
 import javax.management.ObjectName;
 import javax.resource.spi.work.Work;
 import javax.resource.spi.work.WorkException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
 
 /**
  * A simple Straight through flow
@@ -128,8 +131,9 @@
      * 
      * @param packet
      * @throws InterruptedException
+     * @throws MessagingException 
      */
-    public void enqueue(MessageExchange me) throws InterruptedException {
+    public void enqueue(MessageExchange me) throws InterruptedException, MessagingException {
         queue.put(me);
     }
 
@@ -277,4 +281,5 @@
     public void setObjectName(ObjectName objectName) {
         this.objectName = objectName;
     }
+
 }

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java Fri Jun  2 08:59:39 2006
@@ -19,8 +19,10 @@
 import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
 import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
 
+import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.MessageExchange.Role;
 
 /**
  * A simple Straight through flow.
@@ -61,7 +63,19 @@
      * @return true if this flow can handle the given exchange
      */
     public boolean canHandle(MessageExchange me) {
-        return !isPersistent(me) && !isClustered(me);
+        if (isPersistent(me)) {
+            return false;
+        }
+        if (isClustered(me)) {
+            return false;
+        }
+        // We can not handle transactional exchanges:
+        //  * asynchronous is a bit weird when the transaction is conveyed
+        //  * synchronous could lead to deadlock if the provider uses Push delivery
+        if (isTransacted(me)) {
+            return false;
+        }
+        return true;
     }
     
 }

Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/SedaFlowTransactionTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/SedaFlowTransactionTest.java?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/SedaFlowTransactionTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/SedaFlowTransactionTest.java Fri Jun  2 08:59:39 2006
@@ -26,4 +26,23 @@
 	protected Flow createFlow() {
 		return new SedaFlow();
 	}
+
+    public void testAsyncSendSyncReceive() throws Exception {
+        try {
+            super.testAsyncSendSyncReceive();
+            fail("Seda flow does not handle asynchronous transactional exchanges");
+        } catch (Exception e) {
+            // Seda flow does not handle asynchronous transactional exchanges
+        }
+    }
+
+    public void testAsyncSendAsyncReceive() throws Exception {
+        try {
+            super.testAsyncSendAsyncReceive();
+            fail("Seda flow does not handle asynchronous transactional exchanges");
+        } catch (Exception e) {
+            // Seda flow does not handle asynchronous transactional exchanges
+        }
+    }
+
 }

Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/StFlowTransactionTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/StFlowTransactionTest.java?rev=411195&r1=411194&r2=411195&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/StFlowTransactionTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/StFlowTransactionTest.java Fri Jun  2 08:59:39 2006
@@ -26,4 +26,40 @@
 	protected Flow createFlow() {
 		return new STFlow();
 	}
+
+    public void testAsyncSendSyncReceive() throws Exception {
+        try {
+            super.testAsyncSendSyncReceive();
+            fail("ST flow does not handle asynchronous transactional exchanges");
+        } catch (Exception e) {
+            // ST flow does not handle asynchronous transactional exchanges
+        }
+    }
+
+    public void testAsyncSendAsyncReceive() throws Exception {
+        try {
+            super.testAsyncSendAsyncReceive();
+            fail("ST flow does not handle asynchronous transactional exchanges");
+        } catch (Exception e) {
+            // ST flow does not handle asynchronous transactional exchanges
+        }
+    }
+
+    public void testSyncSendAsyncReceive() throws Exception {
+        try {
+            super.testSyncSendAsyncReceive();
+            fail("ST flow does not handle synchronous transactional exchanges");
+        } catch (Exception e) {
+            // ST flow does not handle synchronous transactional exchanges
+        }
+    }
+
+    public void testSyncSendSyncReceive() throws Exception {
+        try {
+            super.testSyncSendSyncReceive();
+            fail("ST flow does not handle synchronous transactional exchanges");
+        } catch (Exception e) {
+            // ST flow does not handle synchronous transactional exchanges
+        }
+    }
 }



Mime
View raw message