camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r641152 - in /activemq/camel/trunk/components/camel-mina: ./ src/main/java/org/apache/camel/component/mina/ src/test/java/org/apache/camel/component/mina/
Date Wed, 26 Mar 2008 05:10:05 GMT
Author: davsclaus
Date: Tue Mar 25 22:10:04 2008
New Revision: 641152

URL: http://svn.apache.org/viewvc?rev=641152&view=rev
Log:
This is a combined patch for CAMEL-394 and CAMEL-395

plus:
- refactored camel-mina
- writing data using mina session.write will now wait for the operation to complete and check if the operation is a success. This is important to handle as we want the operation to complete before continuing our code.
- improved and polished some unit tests
- polished pom to use readable ident
- mvn exec:java to see the CAMEL-395 in action from java main
- better error reporting for mistyped uri configuration
- fixed a few IDEA hints

Added:
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaLoggerOptionTest.java
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerShutdownMockTest.java
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerShutdownTest.java
Modified:
    activemq/camel/trunk/components/camel-mina/pom.xml
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverseProtocolHandler.java
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverserServer.java

Modified: activemq/camel/trunk/components/camel-mina/pom.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/pom.xml?rev=641152&r1=641151&r2=641152&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/pom.xml (original)
+++ activemq/camel/trunk/components/camel-mina/pom.xml Tue Mar 25 22:10:04 2008
@@ -20,79 +20,99 @@
 <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0">
 
-  <modelVersion>4.0.0</modelVersion>
+    <modelVersion>4.0.0</modelVersion>
 
-  <parent>
-    <groupId>org.apache.camel</groupId>
-    <artifactId>camel-parent</artifactId>
-    <version>1.3-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>camel-mina</artifactId>
-  <packaging>bundle</packaging>
-  <name>Camel :: MINA</name>
-  <description>Camel MINA support</description>
-
-  <properties>
-    <camel.osgi.export.pkg>org.apache.camel.component.mina.*</camel.osgi.export.pkg>
-  </properties>
-
-  <dependencies>
-
-    <dependency>
-      <groupId>org.apache.camel</groupId>
-      <artifactId>camel-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.mina</groupId>
-      <artifactId>mina-core</artifactId>
-      <version>${mina-version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-simple</artifactId>
-    </dependency>
-
-    <!-- testing -->
-    <dependency>
-      <groupId>org.apache.camel</groupId>
-      <artifactId>camel-core</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>log4j</groupId>
-      <artifactId>log4j</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>commons-logging</groupId>
-      <artifactId>commons-logging</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <childDelegation>false</childDelegation>
-          <useFile>true</useFile>
-          <forkMode>pertest</forkMode>
-          <includes>
-            <include>**/*Test.*</include>
-          </includes>
-          <excludes>
-            <!-- <exclude>**/XXXTest.*</exclude> -->
-          </excludes>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
+    <parent>
+        <groupId>org.apache.camel</groupId>
+        <artifactId>camel-parent</artifactId>
+        <version>1.3-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>camel-mina</artifactId>
+    <packaging>bundle</packaging>
+    <name>Camel :: MINA</name>
+    <description>Camel MINA support</description>
+
+    <properties>
+        <camel.osgi.export.pkg>org.apache.camel.component.mina.*</camel.osgi.export.pkg>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.mina</groupId>
+            <artifactId>mina-core</artifactId>
+            <version>${mina-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+        </dependency>
+
+        <!-- testing -->
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymockclassextension</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <childDelegation>false</childDelegation>
+                    <useFile>true</useFile>
+                    <forkMode>pertest</forkMode>
+                    <includes>
+                        <include>**/*Test.*</include>
+                    </includes>
+                    <excludes>
+                        <!-- <exclude>**/XXXTest.*</exclude> -->
+                    </excludes>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <!-- allows the special unittest to be ran via 'mvn compile exec:java' -->
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <configuration>
+                    <mainClass>org.apache.camel.component.mina.MinaProducerShutdownTest</mainClass>
+                    <includePluginDependencies>false</includePluginDependencies>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 </project>

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=641152&r1=641151&r2=641152&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Tue Mar 25 22:10:04 2008
@@ -58,9 +58,11 @@
 import org.apache.commons.logging.LogFactory;
 
 /**
- * The component for using Apache MINA.
+ * Component for Apache MINA.
  * 
  * @version $Revision$
+ *
+ * @see org.apache.camel.Component
  */
 public class MinaComponent extends DefaultComponent<MinaExchange> {
     private static final transient Log LOG = LogFactory.getLog(MinaComponent.class);
@@ -77,21 +79,27 @@
 
     @Override
     protected Endpoint<MinaExchange> createEndpoint(String uri, String remaining, Map parameters) throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Creating MinaEndpoint from uri: " + uri);
+        }
+
         URI u = new URI(remaining);
 
         String protocol = u.getScheme();
-        if (protocol.equals("tcp")) {
-            return createSocketEndpoint(uri, u, parameters);
-        }
-        else if (protocol.equals("udp") || protocol.equals("mcast") || protocol.equals("multicast")) {
-            return createDatagramEndpoint(uri, u, parameters);
-        }
-        else if (protocol.equals("vm")) {
-            return createVmEndpoint(uri, u);
-        }
-        else {
-            throw new IllegalArgumentException("Unrecognised MINA protocol: " + protocol + " for uri: " + uri);
+        // if mistyped uri then protocol can be null
+        if (protocol != null) {
+            if (protocol.equals("tcp")) {
+                return createSocketEndpoint(uri, u, parameters);
+            }
+            else if (protocol.equals("udp") || protocol.equals("mcast") || protocol.equals("multicast")) {
+                return createDatagramEndpoint(uri, u, parameters);
+            }
+            else if (protocol.equals("vm")) {
+                return createVmEndpoint(uri, u);
+            }
         }
+        // protocol not resolved so error
+        throw new IllegalArgumentException("Unrecognised MINA protocol: " + protocol + " for uri: " + uri);
     }
 
     protected MinaEndpoint createVmEndpoint(String uri, URI connectUri) {
@@ -106,25 +114,32 @@
         SocketAddress address = new InetSocketAddress(connectUri.getHost(), connectUri.getPort());
         IoConnector connector = new SocketConnector();
 
+        boolean lazySessionCreation = ObjectConverter.toBool(parameters.get("lazySessionCreation"));
+        long timeout = getTimeoutParameter(parameters);
+        boolean transferExchange = ObjectConverter.toBool(parameters.get("transferExchange"));
+        boolean sync = ObjectConverter.toBool(parameters.get("sync"));
+        boolean minaLogger = ObjectConverter.toBool(parameters.get("minaLogger"));
+
         // connector config
         SocketConnectorConfig connectorConfig = new SocketConnectorConfig();
-        configureSocketCodecFactory(connectorConfig, parameters);
-        connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
+        configureSocketCodecFactory("MinaProducer", connectorConfig, parameters);
+        if (minaLogger) {
+            connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
+        }
+        // TODO: CAMEL-396 override connector timeout to either default or timeout provided by end user: connectorConfig.setConnectTimeout(); 
 
         // acceptor connectorConfig
         SocketAcceptorConfig acceptorConfig = new SocketAcceptorConfig();
-        configureSocketCodecFactory(acceptorConfig, parameters);
+        configureSocketCodecFactory("MinaConsumer", acceptorConfig, parameters);
         acceptorConfig.setReuseAddress(true);
         acceptorConfig.setDisconnectOnUnbind(true);
-        acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
-
-        boolean lazySessionCreation = ObjectConverter.toBool(parameters.get("lazySessionCreation"));
-        long timeout = getTimeoutParameter(parameters);
-        boolean transferExchange = ObjectConverter.toBool(parameters.get("transferExchange"));
+        if (minaLogger) {
+            acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
+        }
 
         MinaEndpoint endpoint = new MinaEndpoint(uri, this, address, acceptor, acceptorConfig, connector, connectorConfig, lazySessionCreation, timeout, transferExchange);
 
-        boolean sync = ObjectConverter.toBool(parameters.get("sync"));
+        // set sync or async mode after endpoint is created
         if (sync) {
             endpoint.setExchangePattern(ExchangePattern.InOut);
         } else {
@@ -134,19 +149,23 @@
         return endpoint;
     }
 
-    protected void configureSocketCodecFactory(IoServiceConfig config, Map parameters) {
-        ProtocolCodecFactory codecFactory = getCodecFactory(parameters);
+    protected void configureSocketCodecFactory(String type, IoServiceConfig config, Map parameters) {
+        ProtocolCodecFactory codecFactory = getCodecFactory(type, parameters);
 
         if (codecFactory == null) {
             boolean textline = ObjectConverter.toBool(parameters.get("textline"));
             if (textline) {
-                Charset encoding = getEncodingParameter(parameters);
+                Charset encoding = getEncodingParameter(type, parameters);
                 codecFactory = new TextLineCodecFactory(encoding);
-                LOG.debug("Using TextLineCodecFactory: " + codecFactory + " using encoding: " + encoding);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(type + ": Using TextLineCodecFactory: " + codecFactory + " using encoding: " + encoding);
+                }
             }
             else {
                 codecFactory = new ObjectSerializationCodecFactory();
-                LOG.debug("Using ObjectSerializationCodecFactory: " + codecFactory);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(type + ": Using ObjectSerializationCodecFactory: " + codecFactory);
+                }
             }
         }
 
@@ -158,23 +177,30 @@
         SocketAddress address = new InetSocketAddress(connectUri.getHost(), connectUri.getPort());
         IoConnector connector = new DatagramConnector();
 
+        boolean lazySessionCreation = ObjectConverter.toBool(parameters.get("lazySessionCreation"));
+        long timeout = getTimeoutParameter(parameters);
+        boolean transferExchange = false; // transfer exchange is not supported for datagram protocol
+        boolean sync = ObjectConverter.toBool(parameters.get("sync"));
+        boolean minaLogger = ObjectConverter.toBool(parameters.get("minaLogger"));
+
         DatagramConnectorConfig connectorConfig = new DatagramConnectorConfig();
-        configureDataGramCodecFactory(connectorConfig, parameters);
-        connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
+        configureDataGramCodecFactory("MinaProducer", connectorConfig, parameters);
+        if (minaLogger) {
+            connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
+        }
+        // TODO: CAMEL-396 override connector timeout to either default or timeout provided by end user: connectorConfig.setConnectTimeout();
 
         DatagramAcceptorConfig acceptorConfig = new DatagramAcceptorConfig();
-        configureDataGramCodecFactory(acceptorConfig, parameters);
+        configureDataGramCodecFactory("MinaConsumer", acceptorConfig, parameters);
         acceptorConfig.setDisconnectOnUnbind(true);
         // reuse address is default true for datagram
-        acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
-
-        boolean lazySessionCreation = ObjectConverter.toBool(parameters.get("lazySessionCreation"));
-        long timeout = getTimeoutParameter(parameters);
-        boolean transferExchange = false; // transfer exchange is not supported for datagram protocol
+        if (minaLogger) {
+            acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
+        }
 
         MinaEndpoint endpoint = new MinaEndpoint(uri, this, address, acceptor, acceptorConfig, connector, connectorConfig, lazySessionCreation, timeout, transferExchange);
 
-        boolean sync = ObjectConverter.toBool(parameters.get("sync"));
+        // set sync or async mode after endpoint is created
         if (sync) {
             endpoint.setExchangePattern(ExchangePattern.InOut);
         } else {
@@ -184,11 +210,13 @@
         return endpoint;
     }
 
-    private Charset getEncodingParameter(Map parameters) {
+    private Charset getEncodingParameter(String type, Map parameters) {
         String encoding = (String) parameters.get("encoding");
         if (encoding == null) {
             encoding = Charset.defaultCharset().name();
-            LOG.debug("No encoding parameter using default charset: " + encoding);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(type + ": No encoding parameter using default charset: " + encoding);
+            }
         }
         if (!Charset.isSupported(encoding)) {
             throw new IllegalArgumentException("The encoding: " + encoding + " is not supported");
@@ -215,9 +243,8 @@
      * For datagrams the entire message is available as a single ByteBuffer so lets just pass those around by default
      * and try converting whatever they payload is into ByteBuffers unless some custom converter is specified
      */
-    protected void configureDataGramCodecFactory(IoServiceConfig config, Map parameters) {
-
-        ProtocolCodecFactory codecFactory = getCodecFactory(parameters);
+    protected void configureDataGramCodecFactory(String type, IoServiceConfig config, Map parameters) {
+        ProtocolCodecFactory codecFactory = getCodecFactory(type, parameters);
         if (codecFactory == null) {
             codecFactory = new ProtocolCodecFactory() {
                 public ProtocolEncoder getEncoder() throws Exception {
@@ -253,10 +280,12 @@
             };
 
             // set the encoder used for this datagram codec factory
-            Charset encoding = getEncodingParameter(parameters);
+            Charset encoding = getEncodingParameter(type, parameters);
             encoder = encoding.newEncoder();
 
-            LOG.debug("Using CodecFactory: " + codecFactory + " using encoding: " + encoding);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(type + ": Using CodecFactory: " + codecFactory + " using encoding: " + encoding);
+            }
         }
 
         addCodecFactory(config, codecFactory);
@@ -267,20 +296,19 @@
         if (answer == null) {
             String value = convertTo(String.class, message);
             answer = ByteBuffer.allocate(value.length()).setAutoExpand(true);
-
-            if (value != null) {
-                answer.putString(value, encoder);
-            }
+            answer.putString(value, encoder);
         }
         return answer;
     }
 
-    protected ProtocolCodecFactory getCodecFactory(Map parameters) {
+    protected ProtocolCodecFactory getCodecFactory(String type, Map parameters) {
         ProtocolCodecFactory codecFactory = null;
         String codec = (String) parameters.get("codec");
         if (codec != null) {
             codecFactory = getCamelContext().getRegistry().lookup(codec, ProtocolCodecFactory.class);
-            LOG.debug("Using custom CodecFactory: " + codecFactory);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(type + ": Using custom CodecFactory: " + codecFactory);
+            }
         }
         return codecFactory;
     }

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=641152&r1=641151&r2=641152&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Tue Mar 25 22:10:04 2008
@@ -16,8 +16,10 @@
  */
 package org.apache.camel.component.mina;
 
-import org.apache.camel.Processor;
+import java.net.SocketAddress;
+
 import org.apache.camel.CamelException;
+import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
@@ -27,10 +29,9 @@
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
 
-import java.net.SocketAddress;
-
 /**
  * A {@link org.apache.camel.Consumer Consumer} implementation for Apache MINA.
+ *
  * @version $Revision$
  */
 public class MinaConsumer extends DefaultConsumer<MinaExchange> {
@@ -43,79 +44,84 @@
     public MinaConsumer(final MinaEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
-        address = endpoint.getAddress();
-        acceptor = endpoint.getAcceptor();
+        this.address = endpoint.getAddress();
+        this.acceptor = endpoint.getAcceptor();
     }
 
     @Override
     protected void doStart() throws Exception {
-
+        super.doStart();
         if (LOG.isDebugEnabled()) {
             LOG.debug("Binding to server address: " + address + " using acceptor: " + acceptor);
         }
 
-        IoHandler handler = new IoHandlerAdapter() {
+        IoHandler handler = new ReceiveHandler();
+        acceptor.bind(address, handler, endpoint.getAcceptorConfig());
+    }
 
-            @Override
-            public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
-                // close invalid session
-                if (session != null) {
-                    LOG.debug("Closing session as an exception was thrown from MINA");
-                    session.close();
-                }
+    @Override
+    protected void doStop() throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Unbinding from server address: " + address + " using acceptor: " + acceptor);
+        }
+        acceptor.unbind(address);
+        super.doStop();
+    }
 
-                // must wrap and rethrow since cause can be of Throwable and we must only throw Exception
-                throw new CamelException(cause);
+    /**
+     * Handles comsuming messages and replying if the exchange is out capable.
+     */
+    private final class ReceiveHandler extends IoHandlerAdapter {
+
+        @Override
+        public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
+            // close invalid session
+            if (session != null) {
+                LOG.debug("Closing session as an exception was thrown from MINA");
+                session.close();
             }
 
-            @Override
-            public void messageReceived(IoSession session, Object object) throws Exception {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Received body: " + object);
-                }
+            // must wrap and rethrow since cause can be of Throwable and we must only throw Exception
+            throw new CamelException(cause);
+        }
 
-                MinaExchange exchange = endpoint.createExchange(session, object);
-                getProcessor().process(exchange);
+        @Override
+        public void messageReceived(IoSession session, Object object) throws Exception {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Received body: " + object);
+            }
 
-                if (ExchangeHelper.isOutCapable(exchange)) {
-                    Object body = MinaPayloadHelper.getOut(endpoint, exchange);
-                    boolean failed = exchange.isFailed();
-
-                    if (failed) {
-                        // can not write a response since the exchange is failed and we don't know in what state the
-                        // in/out messages are in so the session is closed
-                        LOG.warn("Can not write body since the exchange is failed, closing session: " + exchange);
-                        session.close();
-                    } else if (body == null) {
-                        // must close session if no data to write otherwise client will never receive a response
-                        // and wait forever (if not timing out)
-                        LOG.warn("Can not write body since its null, closing session: " + exchange);
-                        session.close();
-                    } else {
-                        // we got a response to write
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Writing body: " + body);
-                        }
-                        session.write(body);
-                    }
+            MinaExchange exchange = endpoint.createExchange(session, object);
+            getProcessor().process(exchange);
+
+            if (ExchangeHelper.isOutCapable(exchange)) {
+                Object body = MinaPayloadHelper.getOut(endpoint, exchange);
+                boolean failed = exchange.isFailed();
+
+                if (failed) {
+                    // can not write a response since the exchange is failed and we don't know in what state the
+                    // in/out messages are in so the session is closed
+                    LOG.warn("Can not write body since the exchange is failed, closing session: " + exchange);
+                    session.close();
+                } else if (body == null) {
+                    // must close session if no data to write otherwise client will never receive a response
+                    // and wait forever (if not timing out)
+                    LOG.warn("Can not write body since its null, closing session: " + exchange);
+                    session.close();
                 } else {
+                    // we got a response to write
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("Can not write body since this exchange is not out capable: " + exchange);
+                        LOG.debug("Writing body: " + body);
                     }
+                    MinaHelper.writeBody(session, body, exchange);
+                }
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Can not write body since this exchange is not out capable: " + exchange);
                 }
             }
-        };
-
-        acceptor.bind(address, handler, endpoint.getAcceptorConfig());
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Unbinding from server address: " + address + " using acceptor: " + acceptor);
         }
-        acceptor.unbind(address);
-        super.doStop();
+
     }
 
 }

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java?rev=641152&r1=641151&r2=641152&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java Tue Mar 25 22:10:04 2008
@@ -62,6 +62,7 @@
         this.transferExchange = transferExchange;
     }
 
+    @SuppressWarnings({"unchecked"})
     public Producer<MinaExchange> createProducer() throws Exception {
         return new MinaProducer(this);
     }

Added: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java?rev=641152&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaHelper.java Tue Mar 25 22:10:04 2008
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina;
+
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
+import org.apache.camel.Exchange;
+import org.apache.camel.CamelExchangeException;
+
+/**
+ * Helper class used internally by camel-mina using Apache MINA.
+ */
+public class MinaHelper {
+
+    private MinaHelper() {}
+
+    /**
+     * Writes the given body to MINA session. Will wait until the body has been written.
+     * 
+     * @param session   the MINA session
+     * @param body      the body to write (send)
+     * @param exchange  the mina exchange used for error reporting
+     * @throws CamelExchangeException is thrown if the body could not be written for some reasons
+     *                                (eg remote connection is closed etc.)
+     */
+    public static void writeBody(IoSession session, Object body, Exchange exchange) throws CamelExchangeException {
+        // the write operation is asynchronous. Use WriteFuture to wait until the session has been written
+        WriteFuture future = session.write(body);
+        future.join();
+        if (!future.isWritten()) {
+            throw new CamelExchangeException("Could not write body", exchange);
+        }
+    }
+    
+}

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=641152&r1=641151&r2=641152&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Tue Mar 25 22:10:04 2008
@@ -20,10 +20,10 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.camel.Exchange;
-import org.apache.camel.Producer;
 import org.apache.camel.CamelException;
+import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
@@ -33,7 +33,7 @@
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteFuture;
+import org.apache.mina.transport.socket.nio.SocketConnector;
 
 /**
  * A {@link Producer} implementation for MINA
@@ -47,7 +47,10 @@
     private CountDownLatch latch;
     private boolean lazySessionCreation;
     private long timeout;
+    private IoConnector connector;
+    private boolean sync;
 
+    @SuppressWarnings({"unchecked"})
     public MinaProducer(MinaEndpoint endpoint) {
         super(endpoint);
         this.endpoint = endpoint;
@@ -65,45 +68,43 @@
 
         Object body = MinaPayloadHelper.getIn(endpoint, exchange);
         if (body == null) {
-            LOG.warn("No payload for exchange: " + exchange);
-        } else {
-            if (ExchangeHelper.isOutCapable(exchange)) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Writing body: " + body);
-                }
-
-                // write the body
-                latch = new CountDownLatch(1);
-                WriteFuture future = session.write(body);
-                future.join();
-                if (!future.isWritten()) {
-                    throw new ExchangeTimedOutException(exchange, timeout);
-                }
+            LOG.warn("No payload to send for exchange: " + exchange);
+            return; // exit early since nothing to write
+        }
 
-                // wait for response, consider timeout
-                latch.await(timeout, TimeUnit.MILLISECONDS);
-                if (latch.getCount() == 1) {
-                    throw new ExchangeTimedOutException(exchange, timeout);
-                }
+        // if sync is true then we should also wait for a response (synchronous mode)
+        sync = ExchangeHelper.isOutCapable(exchange);
+        if (sync) {
+            // only initialize latch if we should get a response
+            latch = new CountDownLatch(1);
+        }
+        // write the body
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Writing body: " + body);
+        }
+        MinaHelper.writeBody(session, body, exchange);
 
-                // did we get a response
-                ResponseHandler handler = (ResponseHandler) session.getHandler();
-                if (handler.getCause() != null) {
-                    throw new CamelException("Response Handler had an exception", handler.getCause());
-                } else {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Handler message: " + handler.getMessage());
-                    }
-                    MinaPayloadHelper.setOut(exchange, handler.getMessage());
-                }
+        if (sync) {
+            // wait for response, consider timeout
+            LOG.debug("Waiting for response");
+            latch.await(timeout, TimeUnit.MILLISECONDS);
+            if (latch.getCount() == 1) {
+                throw new ExchangeTimedOutException(exchange, timeout);
+            }
+
+            // did we get a response
+            ResponseHandler handler = (ResponseHandler) session.getHandler();
+            if (handler.getCause() != null) {
+                throw new CamelException("Response Handler had an exception", handler.getCause());
             } else {
-                session.write(body);
+                MinaPayloadHelper.setOut(exchange, handler.getMessage());
             }
         }
     }
 
     @Override
     protected void doStart() throws Exception {
+        super.doStart();
         if (!lazySessionCreation) {
             openConnection();
         }
@@ -111,18 +112,35 @@
 
     @Override
     protected void doStop() throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Stopping connector: " + connector + " at address: " + endpoint.getAddress());
+        }
+
+        if (connector instanceof SocketConnector) {
+            // Change the worker timeout to 0 second to make the I/O thread quit soon when there's no connection to manage.
+            // Default worker timeout is 60 sec and therefore the client using MinaProducer can not terminate the JVM
+            // asap but must wait for the timeout to happend, so to speed this up we set the timeout to 0.
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Setting SocketConnector WorkerTimeout=0 to force MINA stopping its resources faster");
+            }
+            ((SocketConnector) connector).setWorkerTimeout(0);
+        }
+
         if (session != null) {
-            session.close().join(2000);
+            session.close();
         }
+        
+        super.doStop();
     }
 
     private void openConnection() {
         SocketAddress address = endpoint.getAddress();
-        IoConnector connector = endpoint.getConnector();
+        connector = endpoint.getConnector();
         if (LOG.isDebugEnabled()) {
             LOG.debug("Creating connector to address: " + address + " using connector: " + connector + " timeout: " + timeout + " millis.");
         }
         IoHandler ioHandler = new ResponseHandler(endpoint);
+        // connect and wait until the connection is established
         ConnectFuture future = connector.connect(address, ioHandler, endpoint.getConnectorConfig());
         future.join();
         session = future.getSession();
@@ -161,13 +179,13 @@
 
         @Override
         public void sessionClosed(IoSession session) throws Exception {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Session closed");
-            }
-
-            if (message == null) {
-                // session was closed but no message received. This is because the remote server had an internal error
-                // and could not return a proper response. We should count down to stop waiting for a response
+            if (sync && message == null) {
+                // sync=true (InOut mode) so we expected a message as reply but did not get one before the session is closed
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Session closed but no message received from address: " + this.endpoint.getAddress());
+                }
+                // session was closed but no message received. This could be because the remote server had an internal error
+                // and could not return a response. We should count down to stop waiting for a response
                 countDown();
             }
         }
@@ -181,7 +199,6 @@
             if (ioSession != null) {
                 ioSession.close();
             }
-            countDown();
         }
 
         public Throwable getCause() {

Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaLoggerOptionTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaLoggerOptionTest.java?rev=641152&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaLoggerOptionTest.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaLoggerOptionTest.java Tue Mar 25 22:10:04 2008
@@ -0,0 +1,109 @@
+package org.apache.camel.component.mina;
+
+import java.lang.reflect.Field;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.mina.common.IoSession;
+
+/**
+ * For unit testing the <tt>logger</tt> option.
+ */
+public class MinaLoggerOptionTest extends ContextTestSupport {
+
+    public void testLoggerOptionTrue() throws Exception {
+        final String uri = "mina:tcp://localhost:6321?textline=true&minaLogger=true";
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                from(uri).to("mock:result");
+            }
+        });
+
+        MockEndpoint mock = this.getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+        Endpoint endpoint = context.getEndpoint(uri);
+        Exchange exchange = endpoint.createExchange();
+        Producer producer = endpoint.createProducer();
+        producer.start();
+
+        // set input and execute it
+        exchange.getIn().setBody("Hello World");
+        producer.process(exchange);
+
+        Field field = producer.getClass().getDeclaredField("session");
+        field.setAccessible(true);
+        IoSession session = (IoSession) field.get(producer);
+        assertTrue("There should be a logger filter", session.getFilterChain().contains("logger"));
+
+        producer.stop();
+
+        assertMockEndpointsSatisifed();
+    }
+
+    public void testLoggerOptionFalse() throws Exception {
+        final String uri = "mina:tcp://localhost:6321?textline=true&minaLogger=false";
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                from(uri).to("mock:result");
+            }
+        });
+
+        MockEndpoint mock = this.getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+        Endpoint endpoint = context.getEndpoint(uri);
+        Exchange exchange = endpoint.createExchange();
+        Producer producer = endpoint.createProducer();
+        producer.start();
+
+        // set input and execute it
+        exchange.getIn().setBody("Hello World");
+        producer.process(exchange);
+
+        Field field = producer.getClass().getDeclaredField("session");
+        field.setAccessible(true);
+        IoSession session = (IoSession) field.get(producer);
+        assertFalse("There should NOT be a logger filter", session.getFilterChain().contains("logger"));
+
+        producer.stop();
+
+        assertMockEndpointsSatisifed();
+    }
+
+    public void testNoLoggerOption() throws Exception {
+        final String uri = "mina:tcp://localhost:6321?textline=true";
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                from(uri).to("mock:result");
+            }
+        });
+
+        MockEndpoint mock = this.getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+        Endpoint endpoint = context.getEndpoint(uri);
+        Exchange exchange = endpoint.createExchange();
+        Producer producer = endpoint.createProducer();
+        producer.start();
+
+        // set input and execute it
+        exchange.getIn().setBody("Hello World");
+        producer.process(exchange);
+
+        Field field = producer.getClass().getDeclaredField("session");
+        field.setAccessible(true);
+        IoSession session = (IoSession) field.get(producer);
+        assertFalse("There should NOT default be a logger filter", session.getFilterChain().contains("logger"));
+
+        producer.stop();
+
+        assertMockEndpointsSatisifed();
+    }
+
+
+}

Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerShutdownMockTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerShutdownMockTest.java?rev=641152&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerShutdownMockTest.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerShutdownMockTest.java Tue Mar 25 22:10:04 2008
@@ -0,0 +1,83 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.camel.component.mina;
+
+import java.lang.reflect.Field;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import static org.easymock.classextension.EasyMock.createMock;
+import static org.easymock.classextension.EasyMock.replay;
+import static org.easymock.classextension.EasyMock.verify;
+
+/**
+ * Unit testing for using a MinaProducer that it can shutdown properly (CAMEL-395)
+ */
+public class MinaProducerShutdownMockTest extends ContextTestSupport {
+
+    private static final String uri = "mina:tcp://localhost:6321?textline=true";
+
+    public void testProducerShutdownTestingWithMock() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+
+        // create our mock and record expected behaviour = that worker timeout should be set to 0
+        SocketConnector mockConnector = createMock(SocketConnector.class);
+        mockConnector.setWorkerTimeout(0);
+        replay(mockConnector);
+
+        // normal camel code to get a producer
+        Endpoint endpoint = context.getEndpoint(uri);
+        Exchange exchange = endpoint.createExchange();
+        Producer producer = endpoint.createProducer();
+        producer.start();
+
+        // set input and execute it
+        exchange.getIn().setBody("Hello World");
+        producer.process(exchange);
+
+        // insert our mock instead of real MINA IoConnector
+        Field field = producer.getClass().getDeclaredField("connector");
+        field.setAccessible(true);
+        field.set(producer, mockConnector);
+
+        // stop using our mock
+        producer.stop();
+
+        verify(mockConnector);
+
+        assertMockEndpointsSatisifed();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from(uri).to("mock:result");
+            }
+        };
+    }
+
+}
\ No newline at end of file

Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerShutdownTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerShutdownTest.java?rev=641152&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerShutdownTest.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerShutdownTest.java Tue Mar 25 22:10:04 2008
@@ -0,0 +1,91 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.camel.component.mina;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+
+/**
+ * Unit testing for using a MinaProducer that it can shutdown properly (CAMEL-395)
+ * <p>
+ * Run this test from maven: mvn exec:java and see the output if there is a error.
+ */
+public class MinaProducerShutdownTest {
+
+    private static final String uri = "mina:tcp://localhost:6321?textline=true";
+    private long start;
+
+    private CamelContext context;
+
+    public static void main(String[] args) throws Exception {
+        MinaProducerShutdownTest me = new MinaProducerShutdownTest();
+        me.testProducer();
+    }
+
+    public void testProducer() throws Exception {
+        // use shutdown hook to verify that we have stopped within 5 seconds
+        Thread hook = new AssertShutdownHook();
+        Runtime.getRuntime().addShutdownHook(hook);
+
+        start = System.currentTimeMillis();
+
+        context = new DefaultCamelContext();
+        context.addRoutes(createRouteBuilder());
+        context.start();
+
+        sendMessage();
+
+        context.stop();
+    }
+
+    private class AssertShutdownHook extends Thread {
+        public void run() {
+            long diff = System.currentTimeMillis() - start;
+            if (diff > 5000) {
+                System.err.println("ERROR: MinaProducer should be able to shutdown within 5000 millis: time=" + diff);
+            }
+        }
+    }
+
+    private void sendMessage() throws Exception {
+        Endpoint endpoint = context.getEndpoint(uri);
+        Producer producer = endpoint.createProducer();
+
+        Exchange exchange = endpoint.createExchange();
+        exchange.getIn().setBody("Hello World");
+
+        producer.start();
+        producer.process(exchange);
+        producer.stop();
+    }
+
+    private RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from(uri).to("mock:result");
+            }
+        };
+    }
+
+}

Modified: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java?rev=641152&r1=641151&r2=641152&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutTest.java Tue Mar 25 22:10:04 2008
@@ -36,31 +36,30 @@
  */
 public class MinaTcpWithInOutTest extends TestCase {
 	
-    protected String uri;
-    protected Exchange receivedExchange;
-    protected CountDownLatch latch;
-    protected CamelContext container;
+    private String uri;
+    private Exchange receivedExchange;
+    private CountDownLatch latch;
+    private CamelContext container;
 	
     public void testMinaRouteWithInOut() throws Exception {
     	container = new DefaultCamelContext();
     	latch = new CountDownLatch(1);
     	uri = "mina:tcp://localhost:6321?textline=true";
-    	Producer<Exchange> producer;
-    	ReverserServer server;
-        server = new ReverserServer();
+
+        ReverserServer server = new ReverserServer();
         server.start();
+
         container.addRoutes(createRouteBuilder());
         container.start();
     	
         // now lets fire in a message
-        Endpoint<Exchange> endpoint = container.getEndpoint("direct:x");
+        Endpoint endpoint = container.getEndpoint("direct:x");
         Exchange exchange = endpoint.createExchange(ExchangePattern.InOut);
         Message message = exchange.getIn();
-        String hello = "Hello!";
-        message.setBody(hello);
+        message.setBody("Hello!");
         message.setHeader("cheese", 123);
 
-        producer = endpoint.createProducer();
+        Producer producer = endpoint.createProducer();
         producer.start();
         producer.process(exchange);
 
@@ -70,9 +69,7 @@
         assertNotNull(receivedExchange.getIn());
         assertEquals("!olleH", receivedExchange.getIn().getBody());
 
-        if (producer != null) {
-            producer.stop();
-        }
+        producer.stop();
         container.stop();
         server.stop();
     }
@@ -81,22 +78,22 @@
     	container = new DefaultCamelContext();
     	latch = new CountDownLatch(1);
     	uri = "mina:tcp://localhost:6321?textline=true&lazySessionCreation=true";
-    	Producer<Exchange> producer;
+
         container.addRoutes(createRouteBuilder());
         container.start();
-    	ReverserServer server;          //The server is activated after Camel to check if the lazyness is working
-        server = new ReverserServer();
+
+        // The server is activated after Camel to check if the lazyness is working
+        ReverserServer server = new ReverserServer();
         server.start();
         
         // now lets fire in a message
-        Endpoint<Exchange> endpoint = container.getEndpoint("direct:x");
+        Endpoint endpoint = container.getEndpoint("direct:x");
         Exchange exchange = endpoint.createExchange(ExchangePattern.InOut);
         Message message = exchange.getIn();
-        String hello = "Hello!";
-        message.setBody(hello);
+        message.setBody("Hello!");
         message.setHeader("cheese", 123);
 
-        producer = endpoint.createProducer();
+        Producer producer = endpoint.createProducer();
         producer.start();
         producer.process(exchange);
 
@@ -106,27 +103,17 @@
         assertNotNull(receivedExchange.getIn());
         assertEquals("!olleH", receivedExchange.getIn().getBody());
 
-        if (producer != null) {
-            producer.stop();
-        }
+        producer.stop();
         container.stop();
         server.stop();
     }
     
-    @Override
-    protected void setUp() throws Exception {
-    }
-
-    @Override
-    protected void tearDown() throws Exception {
-    }
-
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
                 from("direct:x").to(uri).process(new Processor() {
                     public void process(Exchange e) {
-                        System.out.println("Received exchange: " + e.getIn());
+                        // System.out.println("Received exchange: " + e.getIn());
                         receivedExchange = e;
                         latch.countDown();
                     }
@@ -134,4 +121,5 @@
             }
         };
     }
+    
 }

Modified: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java?rev=641152&r1=641151&r2=641152&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java Tue Mar 25 22:10:04 2008
@@ -16,27 +16,23 @@
  */
 package org.apache.camel.component.mina;
 
-import junit.framework.TestCase;
-import org.apache.camel.CamelContext;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.DefaultCamelContext;
-
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.io.OutputStream;
-import java.io.InputStream;
-import java.io.IOException;
 
 /**
  * To test camel-mina component using a TCP client that communicates using TCP socket communication.
  *
  * @version $Revision$
  */
-public class MinaTcpWithInOutUsingPlainSocketTest extends TestCase {
-
-    protected CamelContext container = new DefaultCamelContext();
+public class MinaTcpWithInOutUsingPlainSocketTest extends ContextTestSupport {
 
     private static final int PORT = 6333;
     // use parameter sync=true to force InOut pattern of the MinaExchange
@@ -72,7 +68,7 @@
         assertNull("no data should be recieved", out);
     }
 
-    public void xtestExchangeFailedOutShouldBeNull() throws Exception {
+    public void testExchangeFailedOutShouldBeNull() throws Exception {
         String out = sendAndReceive("force-exception");
         assertTrue("out should not be the same as in when the exchange has failed", !"force-exception".equals(out));
         assertNull("no data should be retrieved", out);
@@ -117,18 +113,6 @@
         }
 
         return sb.toString();
-    }
-
-    @Override
-    protected void setUp() throws Exception {
-        container.addRoutes(createRouteBuilder());
-        container.start();
-    }
-
-
-    @Override
-    protected void tearDown() throws Exception {
-        container.stop();
     }
 
     protected RouteBuilder createRouteBuilder() {

Modified: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java?rev=641152&r1=641151&r2=641152&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java Tue Mar 25 22:10:04 2008
@@ -38,12 +38,10 @@
 public class MinaTransferExchangeOptionTest extends ContextTestSupport {
 
     private static final String URI = "mina:tcp://localhost:6321?sync=true&transferExchange=true";
-    private static String id;
 
     public void testMinaTransferExchangeOption() throws Exception {
         Endpoint endpoint = context.getEndpoint(URI);
         Exchange exchange = endpoint.createExchange();
-        id = exchange.getExchangeId();
 
         Message message = exchange.getIn();
         message.setBody("Hello!");
@@ -56,11 +54,9 @@
 
         Message out = exchange.getOut();
         assertNotNull(out);
-        System.out.println("out" + out);
         assertEquals("Goodbye!", out.getBody());
         assertEquals("cheddar", out.getHeader("cheese"));
         assertEquals("fresh", exchange.getProperty("salami"));
-        assertEquals(id, exchange.getExchangeId());
 
         // in should stay the same
         Message in = exchange.getIn();
@@ -76,17 +72,12 @@
             public void configure() {
                 from(URI).process(new Processor() {
                     public void process(Exchange e) throws InterruptedException {
-                        // to force some delay to test that the id stays the same even though time is 100 millis later
-                        Thread.sleep(100);
-
                         Assert.assertNotNull(e.getIn().getBody());
                         Assert.assertNotNull(e.getIn().getHeaders());
                         Assert.assertNotNull(e.getProperties());
                         Assert.assertEquals("Hello!", e.getIn().getBody());
                         Assert.assertEquals("feta", e.getIn().getHeader("cheese"));
                         Assert.assertEquals("old", e.getProperty("ham"));
-                        // do not marshal the exchangeId here
-                        //Assert.assertEquals(id, e.getExchangeId());
                         Assert.assertEquals(ExchangePattern.InOut, e.getPattern());
 
                         e.getOut().setBody("Goodbye!");

Modified: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverseProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverseProtocolHandler.java?rev=641152&r1=641151&r2=641152&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverseProtocolHandler.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverseProtocolHandler.java Tue Mar 25 22:10:04 2008
@@ -30,6 +30,7 @@
  * @version $Rev$, $Date$,
  */
 public class ReverseProtocolHandler extends IoHandlerAdapter {
+    
     public void exceptionCaught(IoSession session, Throwable cause) {
         cause.printStackTrace();
         // Close connection when unexpected exception is caught.
@@ -47,4 +48,5 @@
         // and write it back.
         session.write(buf.toString());
     }
+    
 }

Modified: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverserServer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverserServer.java?rev=641152&r1=641151&r2=641152&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverserServer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/ReverserServer.java Tue Mar 25 22:10:04 2008
@@ -23,7 +23,6 @@
 import java.nio.charset.Charset;
 
 import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.filter.LoggingFilter;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
 import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
 import org.apache.mina.transport.socket.nio.SocketAcceptor;
@@ -42,27 +41,25 @@
 
     public void start() throws Exception {
         acceptor = new SocketAcceptor();
+
         // Prepare the configuration
         SocketAcceptorConfig cfg = new SocketAcceptorConfig();
         cfg.setReuseAddress(true);
-        cfg.getFilterChain().addLast("logger", new LoggingFilter());
-        cfg.getFilterChain().addLast(
-                "codec",
-                new ProtocolCodecFilter(new TextLineCodecFactory(Charset
-                        .forName("UTF-8"))));
+        Charset charset = Charset.forName("UTF-8");
+        cfg.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(charset)));
 
         // Bind
-        acceptor.bind(new InetSocketAddress(port),
-                new ReverseProtocolHandler(), cfg);
+        acceptor.bind(new InetSocketAddress(port), new ReverseProtocolHandler(), cfg);
 
-        System.out.println("Listening on port " + port);
+        // System.out.println("Listening on port " + port);
     }
     
-    public void stop() throws Exception{
+    public void stop() throws Exception {
         acceptor.unbindAll();
     }
     
-    public int getPort(){
+    public int getPort() {
         return port;
     }
+    
 }



Mime
View raw message