geronimo-scm mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdil...@apache.org
Subject svn commit: r577545 [2/3] - in /geronimo/sandbox/gshell/trunk: gshell-assembly/src/main/underlay/etc/ gshell-remote/gshell-remote-client/src/main/java/org/apache/geronimo/gshell/remote/client/ gshell-remote/gshell-remote-common/src/main/java/org/apache...
Date Thu, 20 Sep 2007 04:55:22 GMT
Added: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Requestor.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Requestor.java?rev=577545&view=auto
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Requestor.java (added)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Requestor.java Wed Sep 19 21:55:19 2007
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geronimo.gshell.remote.request;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.geronimo.gshell.remote.message.Message;
+import org.apache.geronimo.gshell.remote.transport.Transport;
+import org.apache.mina.common.IoFutureListener;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ???
+ *
+ * @version $Rev$ $Date$
+ */
+public class Requestor
+{
+    public static final long DEFAULT_TIMEOUT = 60;
+
+    public static final TimeUnit DEFAULT_TIMEOUT_UNIT = TimeUnit.SECONDS;
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    
+    private final MessageWriter writer;
+
+    private final long timeout;
+
+    private final TimeUnit unit;
+
+    private Requestor(final MessageWriter writer, final long timeout, final TimeUnit unit) {
+        assert writer != null;
+
+        this.writer = writer;
+        this.timeout = timeout;
+        this.unit = unit;
+    }
+
+    public Requestor(final IoSession session, final long timeout, final TimeUnit unit) {
+        this(new SessionMessageWriter(session), timeout, unit);
+    }
+
+    public Requestor(final IoSession session) {
+        this(session, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT_UNIT);
+    }
+
+    public Requestor(final Transport transport, final long timeout, final TimeUnit unit) {
+        this(new TransportMessageWriter(transport), timeout, unit);
+    }
+
+    public Requestor(final Transport transport) {
+        this(transport, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT_UNIT);
+    }
+
+    public RequestWriteFuture submit(final Message msg, final long timeout, final TimeUnit unit) throws Exception {
+        assert msg != null;
+
+        Request req = new Request(msg, timeout, unit);
+
+        WriteFuture wf = writer.write(req);
+
+        return new RequestWriteFuture(wf, req);
+    }
+
+    public RequestWriteFuture submit(final Message msg) throws Exception {
+        return submit(msg, timeout, unit);
+    }
+
+    public Message request(final Message msg, final long timeout, final TimeUnit unit) throws Exception {
+        assert msg != null;
+
+        RequestWriteFuture wf = submit(msg, timeout, unit);
+
+        Request req = wf.getRequest();
+
+        //
+        // HACK:
+        //
+        
+        Response resp = req.awaitResponse(30, TimeUnit.SECONDS);
+
+        //
+        // HACK:
+        //
+        
+        if (resp == null) {
+            throw new IllegalStateException("Failed to read response");
+        }
+        
+        return resp.getMessage();
+    }
+
+    public Message request(final Message msg) throws Exception {
+        return request(msg, timeout, unit);
+    }
+
+    //
+    // MessageWriter
+    //
+
+    private static interface MessageWriter
+    {
+        WriteFuture write(Object message) throws Exception;
+    }
+
+    private static class SessionMessageWriter
+        implements MessageWriter
+    {
+        private final IoSession session;
+
+        public SessionMessageWriter(IoSession session) {
+            this.session = session;
+        }
+
+        public WriteFuture write(Object message) throws Exception {
+            return session.write(message);
+        }
+    }
+
+    private static class TransportMessageWriter
+        implements MessageWriter
+    {
+        private final Transport transport;
+
+        public TransportMessageWriter(Transport transport) {
+            this.transport = transport;
+        }
+
+        public WriteFuture write(Object message) throws Exception {
+            return transport.send(message);
+        }
+    }
+
+    //
+    // RequestWriteFuture
+    //
+
+    public class RequestWriteFuture
+        implements WriteFuture
+    {
+        private final WriteFuture delegate;
+
+        private final Request request;
+
+        public RequestWriteFuture(final WriteFuture wf, final Request req) {
+            assert wf != null;
+            assert req != null;
+
+            this.delegate = wf;
+            this.request = req;
+        }
+
+        public Request getRequest() {
+            return request;
+        }
+
+        //
+        // WriteFuture
+        //
+
+        public boolean isWritten() {
+            return delegate.isWritten();
+        }
+
+        public void setWritten(boolean written) {
+            delegate.setWritten(written);
+        }
+
+        public IoSession getSession() {
+            return delegate.getSession();
+        }
+
+        public WriteFuture await() throws InterruptedException {
+            return delegate.await();
+        }
+
+        public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
+            return delegate.await(timeout, unit);
+        }
+
+        public boolean await(long timeoutMillis) throws InterruptedException {
+            return delegate.await(timeoutMillis);
+        }
+
+        public WriteFuture awaitUninterruptibly() {
+            return delegate.awaitUninterruptibly();
+        }
+
+        public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
+            return delegate.awaitUninterruptibly(timeout, unit);
+        }
+
+        public boolean awaitUninterruptibly(long timeoutMillis) {
+            return delegate.awaitUninterruptibly(timeoutMillis);
+        }
+
+        @Deprecated
+        public void join() {
+            delegate.join();
+        }
+
+        @Deprecated
+        public boolean join(long timeoutMillis) {
+            return delegate.join(timeoutMillis);
+        }
+
+        public boolean isReady() {
+            return delegate.isReady();
+        }
+
+        public WriteFuture addListener(IoFutureListener listener) {
+            return delegate.addListener(listener);
+        }
+
+        public WriteFuture removeListener(IoFutureListener listener) {
+            return delegate.removeListener(listener);
+        }
+    }
+}
\ No newline at end of file

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Requestor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Requestor.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Requestor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Response.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Response.java?rev=577545&view=auto
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Response.java (added)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Response.java Wed Sep 19 21:55:19 2007
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geronimo.gshell.remote.request;
+
+import org.apache.geronimo.gshell.common.tostring.ReflectionToStringBuilder;
+import org.apache.geronimo.gshell.common.tostring.ToStringStyle;
+import org.apache.geronimo.gshell.remote.message.Message;
+
+//
+// NOTE: Snatched and massaged from Apache Mina
+//
+
+/**
+ * ???
+ *
+ * @version $Rev$ $Date$
+ */
+public class Response
+{
+    private final Request request;
+
+    private final Type type;
+
+    private final Message message;
+
+    public Response(final Request request, final Message message, final Type type) {
+        assert request != null;
+        assert message != null;
+        assert type != null;
+
+        this.request = request;
+        this.type = type;
+        this.message = message;
+    }
+
+    public Request getRequest() {
+        return request;
+    }
+
+    public Type getType() {
+        return type;
+    }
+
+    public Message getMessage() {
+        return message;
+    }
+
+    public int hashCode() {
+        return getRequest().getId().hashCode();
+    }
+
+    public boolean equals(Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        else if (obj == null) {
+            return false;
+        }
+        else if (!(obj instanceof Response)) {
+            return false;
+        }
+
+        Response resp = (Response) obj;
+
+        return getRequest().equals(resp.getRequest()) && getType().equals(resp.getType());
+
+    }
+
+    public String toString() {
+        return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+    }
+
+    //
+    // Response Type
+    //
+
+    public static enum Type
+    {
+        WHOLE,
+        PARTIAL,
+        PARTIAL_LAST
+    }
+}

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Response.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Response.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Response.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/package-info.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/package-info.java?rev=577545&view=auto
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/package-info.java (added)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/package-info.java Wed Sep 19 21:55:19 2007
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Request response support for messages.
+ *
+ * @version $Rev$ $Date$
+ */
+package org.apache.geronimo.gshell.remote.request;
\ No newline at end of file

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/package-info.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/package-info.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/package-info.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/security/SecurityFilter.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/security/SecurityFilter.java?rev=577545&r1=577544&r2=577545&view=diff
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/security/SecurityFilter.java (original)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/security/SecurityFilter.java Wed Sep 19 21:55:19 2007
@@ -45,8 +45,6 @@
 public class SecurityFilter
     extends IoFilterAdapter
 {
-    public static final String NAME = "security";
-    
     private static final String AUTHENTICATED_KEY = SecurityFilter.class.getName() + ".authenticated";
 
     private static final String REMOTE_PUBLIC_KEY_KEY = SecurityFilter.class.getName() + ".remotePublicKey";
@@ -66,6 +64,10 @@
     private final UUID securityToken;
 
     public SecurityFilter() throws Exception {
+        //
+        // TODO: Would be nice to use the schedular from the transport and not create another one... ?? or not...
+        //
+        
         scheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors());
 
         //
@@ -157,7 +159,11 @@
             
             // And then send back our public key to the remote client
             msg.reply(new HandShakeMessage.Result(crypto.getPublicKey()));
-            
+
+            //
+            // NOTE: Don't wait on the write future
+            //
+
             // Schedule a task to timeout the login process
             scheduleTimeout(session);
         }
@@ -191,6 +197,10 @@
                 log.info("Successfull authentication for user: {}, at location: {}", username, session.getRemoteAddress());
 
                 msg.reply(new LoginMessage.Result());
+
+                //
+                // NOTE: Don't wait on the write future
+                //
             }
         }
     }
@@ -209,7 +219,7 @@
     }
 
     private ScheduledFuture scheduleTimeout(final IoSession session) {
-        return scheduleTimeout(session, 10, TimeUnit.SECONDS);
+        return scheduleTimeout(session, 30, TimeUnit.SECONDS);
     }
     
     private boolean cancelTimeout(final IoSession session) {

Modified: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/SessionInputStream.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/SessionInputStream.java?rev=577545&r1=577544&r2=577545&view=diff
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/SessionInputStream.java (original)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/SessionInputStream.java Wed Sep 19 21:55:19 2007
@@ -17,17 +17,19 @@
  * under the License.
  */
 
+package org.apache.geronimo.gshell.remote.stream;
+
 //
-// NOTE: Snatched from Apache Mina
+// NOTE: Snatched and massaged from Apache Mina
 //
 
-package org.apache.geronimo.gshell.remote.stream;
-
 import java.io.IOException;
 import java.io.InputStream;
 
-import org.apache.geronimo.gshell.remote.message.WriteStreamMessage;
+import org.apache.geronimo.gshell.common.tostring.ReflectionToStringBuilder;
+import org.apache.geronimo.gshell.common.tostring.ToStringStyle;
 import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +41,7 @@
 public class SessionInputStream
     extends InputStream
 {
-    private final Logger log = LoggerFactory.getLogger(getClass());
+    private static final Logger log = LoggerFactory.getLogger(SessionInputStream.class);
     
     private final Object mutex = new Object();
 
@@ -52,12 +54,15 @@
     private IOException exception;
 
     public SessionInputStream() {
-        buff = ByteBuffer.allocate(256);
+        buff = ByteBuffer.allocate(16);
         buff.setAutoExpand(true);
-        // buff.limit(0);
+        buff.limit(0);
+    }
+
+    public String toString() {
+        return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 
-    @Override
     public int available() {
         if (released) {
             return 0;
@@ -69,7 +74,6 @@
         }
     }
 
-    @Override
     public void close() {
         if (closed) {
             return;
@@ -77,13 +81,13 @@
 
         synchronized (mutex) {
             closed = true;
+
             releaseBuffer();
 
             mutex.notifyAll();
         }
     }
 
-    @Override
     public int read() throws IOException {
         synchronized (mutex) {
             if (!waitForData()) {
@@ -94,7 +98,6 @@
         }
     }
 
-    @Override
     public int read(final byte[] b, final int off, final int len) throws IOException {
         synchronized (mutex) {
             if (!waitForData()) {
@@ -136,6 +139,7 @@
 
         if (exception != null) {
             releaseBuffer();
+
             throw exception;
         }
 
@@ -158,7 +162,9 @@
 
     public void write(final WriteStreamMessage msg) {
         assert msg != null;
-        
+
+        log.debug("Writing...");
+
         synchronized (mutex) {
             if (closed) {
                 return;
@@ -167,21 +173,24 @@
             ByteBuffer src = msg.getBuffer();
 
             log.debug("Filling {} byte(s) into stream from: {}", src.remaining(), src);
-            
+
             if (buff.hasRemaining()) {
                 log.debug("Buffer has remaining: {} byte(s)", buff.remaining());
-                
+
                 buff.compact();
+                buff.put(src);
+                buff.flip();
             }
             else {
                 buff.clear();
-            }
-
-            buff.put(src);
-            buff.flip();
+                buff.put(src);
+                buff.flip();
 
-            mutex.notifyAll();
+                mutex.notifyAll();
+            }
         }
+
+        log.debug("Done");
     }
 
     public void throwException(final IOException e) {
@@ -192,5 +201,42 @@
                 mutex.notifyAll();
             }
         }
+    }
+
+    //
+    // Session Access
+    //
+
+    public static SessionInputStream lookup(final IoSession session) {
+        assert session != null;
+
+        SessionInputStream in = (SessionInputStream) session.getAttribute(SessionInputStream.class.getName());
+
+        if (in == null) {
+            throw new IllegalStateException("Input stream not bound");
+        }
+
+        return in;
+    }
+
+    public static void bind(final IoSession session, final SessionInputStream in) {
+        assert session != null;
+        assert in != null;
+
+        Object obj = session.getAttribute(SessionInputStream.class.getName());
+
+        if (obj != null) {
+            throw new IllegalStateException("Input stream already bound");
+        }
+
+        session.setAttribute(SessionInputStream.class.getName(), in);
+
+        log.trace("Bound input stream: {}", in);
+    }
+
+    public static SessionInputStream unbind(final IoSession session) {
+        assert session != null;
+
+        return (SessionInputStream) session.removeAttribute(SessionInputStream.class.getName());
     }
 }

Modified: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/SessionOutputStream.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/SessionOutputStream.java?rev=577545&r1=577544&r2=577545&view=diff
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/SessionOutputStream.java (original)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/SessionOutputStream.java Wed Sep 19 21:55:19 2007
@@ -17,19 +17,22 @@
  * under the License.
  */
 
+package org.apache.geronimo.gshell.remote.stream;
+
 //
-// NOTE: Snatched from Apache Mina
+// NOTE: Snatched and massaged from Apache Mina
 //
 
-package org.apache.geronimo.gshell.remote.stream;
-
 import java.io.IOException;
 import java.io.OutputStream;
 
-import org.apache.geronimo.gshell.remote.message.WriteStreamMessage;
+import org.apache.geronimo.gshell.common.tostring.ReflectionToStringBuilder;
+import org.apache.geronimo.gshell.common.tostring.ToStringStyle;
+import org.apache.geronimo.gshell.remote.message.Message;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.WriteFuture;
+import org.apache.mina.util.NewThreadExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,74 +44,151 @@
 public class SessionOutputStream
     extends OutputStream
 {
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    
+    private static final Logger log = LoggerFactory.getLogger(SessionOutputStream.class);
+
+    private final Object mutex = new Object();
+
     private final IoSession session;
 
     private WriteFuture lastWriteFuture;
 
+    private volatile boolean opened;
+
     public SessionOutputStream(final IoSession session) {
         assert session != null;
         
         this.session = session;
+
+        this.opened = true;
+    }
+
+    public String toString() {
+        return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 
-    @Override
     public void close() throws IOException {
+        if (!opened) {
+            return;
+        }
+        
         try {
             flush();
+
+            opened = false;
         }
         finally {
             super.close();
         }
     }
 
+    public void write(final byte[] b, final int off, final int len) throws IOException {
+        write(ByteBuffer.wrap(b.clone(), off, len));
+    }
+
+    public void write(final int b) throws IOException {
+        ByteBuffer buff = ByteBuffer.allocate(1);
+
+        buff.put((byte) b);
+
+        buff.flip();
+
+        write(buff);
+    }
+
     private void ensureOpened() throws IOException {
-        if (!session.isConnected()) {
+        if (!opened || !session.isConnected()) {
             throw new IOException("The session has been closed.");
         }
     }
 
-    private synchronized void write(final ByteBuffer buff) throws IOException {
-        ensureOpened();
+    private void write(final ByteBuffer buff) throws IOException {
+        synchronized (mutex) {
+            ensureOpened();
 
-        log.debug("Writing stream from: {}", buff);
+            log.debug("Writing: {}", buff);
 
-        WriteStreamMessage msg = new WriteStreamMessage(buff);
+            Message msg = new WriteStreamMessage(buff);
 
-        lastWriteFuture = session.write(msg);
-    }
+            //
+            // TODO: See if we should hold on to each of the write futures what we go through before flushing?
+            //
+            
+            final WriteFuture wf = session.write(msg);
 
-    @Override
-    public void write(final byte[] b, final int off, final int len) throws IOException {
-        write(ByteBuffer.wrap(b.clone(), off, len));
+            lastWriteFuture = wf;
+
+            log.debug("Done");
+            
+            new NewThreadExecutor().execute(new Runnable() {
+                public void run() {
+                    log.debug("Waiting for full write");
+
+                    wf.awaitUninterruptibly();
+
+                    log.debug("Completed; written: {}", wf.isWritten());
+                }
+            });
+        }
     }
 
-    @Override
-    public void write(final int b) throws IOException {
-        ByteBuffer buff = ByteBuffer.allocate(1);
+    public void flush() throws IOException {
+        synchronized (mutex) {
+            ensureOpened();
 
-        buff.put((byte) b);
+            if (lastWriteFuture == null) {
+                return;
+            }
 
-        buff.flip();
+            log.debug("Flushing");
 
-        write(buff);
+            // Process the last write future and clear it
+            WriteFuture wf = lastWriteFuture;
+            lastWriteFuture = null;
+
+            wf.awaitUninterruptibly();
+
+            if (!wf.isWritten()) {
+                throw new IOException("Failed to fully write bytes to the session");
+            }
+
+            log.debug("Flushed");
+        }
     }
 
-    @Override
-    public synchronized void flush() throws IOException {
-        if (lastWriteFuture == null) {
-            return;
+    //
+    // Session Access
+    //
+
+    public static SessionOutputStream lookup(final IoSession session) {
+        assert session != null;
+
+        SessionOutputStream out = (SessionOutputStream) session.getAttribute(SessionOutputStream.class.getName());
+
+        if (out == null) {
+            throw new IllegalStateException("Output stream not bound");
         }
 
-        log.debug("Flushing stream...");
-        
-        lastWriteFuture.awaitUninterruptibly();
-        
-        if (!lastWriteFuture.isWritten()) {
-            throw new IOException("The bytes could not be written to the session");
+        return out;
+    }
+
+    public static void bind(final IoSession session, final SessionOutputStream out) {
+        assert session != null;
+        assert out != null;
+
+        Object obj = session.getAttribute(SessionOutputStream.class.getName());
+
+        if (obj != null) {
+            throw new IllegalStateException("Output stream already bound");
         }
 
-        log.debug("Flushed");
+        session.setAttribute(SessionOutputStream.class.getName(), out);
+
+        log.trace("Bound output stream: {}", out);
+    }
+
+    public static SessionOutputStream unbind(final IoSession session) {
+        assert session != null;
+
+        return (SessionOutputStream) session.removeAttribute(SessionOutputStream.class.getName());
     }
 }

Added: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/SessionStreamFilter.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/SessionStreamFilter.java?rev=577545&view=auto
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/SessionStreamFilter.java (added)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/SessionStreamFilter.java Wed Sep 19 21:55:19 2007
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geronimo.gshell.remote.stream;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.IoFilter;
+import org.apache.mina.common.WriteRequest;
+import org.codehaus.plexus.util.IOUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ???
+ *
+ * @version $Rev$ $Date$
+ */
+public class SessionStreamFilter
+    extends IoFilterAdapter
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final ExecutorService executor;
+
+    public SessionStreamFilter(final ExecutorService executor) {
+        assert executor != null;
+
+        this.executor = executor;
+    }
+
+    public SessionStreamFilter() {
+        this(Executors.newCachedThreadPool());
+    }
+
+    //
+    // TODO: See if we need to put the executor into the session context
+    //
+
+    public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception {
+        log.debug("Binding session streams");
+
+        SessionInputStream.bind(session, new SessionInputStream());
+
+        SessionOutputStream.bind(session, new SessionOutputStream(session));
+
+        nextFilter.sessionCreated(session);
+    }
+
+    /*
+    public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
+        IoSessionConfig config = session.getConfig();
+
+        config.setWriteTimeout(60);
+        config.setIdleTime(IdleStatus.READER_IDLE, 60);
+
+        nextFilter.sessionOpened(session);
+    }
+    */
+    
+    public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
+        log.debug("Unbinding session streams");
+
+        IOUtil.close(SessionInputStream.unbind(session));
+        
+        IOUtil.close(SessionOutputStream.unbind(session));
+
+        nextFilter.sessionClosed(session);
+    }
+
+    public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception {
+        log.debug("Session idle: {}, status: {}", session, status);
+
+        nextFilter.sessionIdle(session, status);
+    }
+
+    public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception {
+        log.debug("Exception caught: " + session + ", cause: " + cause, cause);
+
+        nextFilter.exceptionCaught(session, cause);
+    }
+
+    public void messageReceived(final NextFilter nextFilter, final IoSession session, final Object message) throws Exception {
+        if (message instanceof WriteStreamMessage) {
+            final WriteStreamMessage msg = (WriteStreamMessage) message;
+
+            final SessionInputStream in = SessionInputStream.lookup(session);
+
+            Runnable task = new Runnable() {
+                public void run() {
+                    log.debug("Writing stream...");
+
+                    in.write(msg);
+
+                    log.debug("Done");
+                }
+            };
+
+            executor.execute(task);
+
+            // There is no need to pass on this message to any other filters, they have no use for it...
+        }
+        else {
+            nextFilter.messageReceived(session, message);
+        }
+    }
+
+    public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
+        Object message = writeRequest.getMessage();
+
+        if (message instanceof WriteStreamMessage) {
+            log.debug("Message sent: {}, msg: {}", session, message);
+
+            //
+            // TODO: Check the future's status?
+            //
+        }
+
+        nextFilter.messageSent(session, writeRequest);
+    }
+}
\ No newline at end of file

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/SessionStreamFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/SessionStreamFilter.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/SessionStreamFilter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/StreamFeeder.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/StreamFeeder.java?rev=577545&r1=577544&r2=577545&view=diff
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/StreamFeeder.java (original)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/StreamFeeder.java Wed Sep 19 21:55:19 2007
@@ -40,7 +40,7 @@
 
     private final OutputStream output;
 
-    private boolean running;
+    private volatile boolean running;
 
     public StreamFeeder(final InputStream input, final OutputStream output) {
         assert input != null;

Copied: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/WriteStreamMessage.java (from r577140, geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/message/WriteStreamMessage.java)
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/WriteStreamMessage.java?p2=geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/WriteStreamMessage.java&p1=geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/message/WriteStreamMessage.java&r1=577140&r2=577545&rev=577545&view=diff
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/message/WriteStreamMessage.java (original)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/WriteStreamMessage.java Wed Sep 19 21:55:19 2007
@@ -17,8 +17,11 @@
  * under the License.
  */
 
-package org.apache.geronimo.gshell.remote.message;
+package org.apache.geronimo.gshell.remote.stream;
 
+import org.apache.geronimo.gshell.remote.codec.MarshallingUtil;
+import org.apache.geronimo.gshell.remote.message.MessageSupport;
+import org.apache.geronimo.gshell.remote.message.MessageType;
 import org.apache.mina.common.ByteBuffer;
 
 /**
@@ -61,7 +64,7 @@
 
         super.readExternal(in);
 
-        buffer = readBuffer(in);
+        buffer = MarshallingUtil.readBuffer(in);
     }
 
     public void writeExternal(final ByteBuffer out) throws Exception {
@@ -69,6 +72,6 @@
 
         super.writeExternal(out);
 
-        writeBuffer(out, buffer);
+        MarshallingUtil.writeBuffer(out, buffer);
     }
 }

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/WriteStreamMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/WriteStreamMessage.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/stream/WriteStreamMessage.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/ConnectionException.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/ConnectionException.java?rev=577545&view=auto
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/ConnectionException.java (added)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/ConnectionException.java Wed Sep 19 21:55:19 2007
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geronimo.gshell.remote.transport;
+
+/**
+ * ???
+ *
+ * @version $Rev$ $Date$
+ */
+public class ConnectionException
+    extends TransportException
+{
+    private static final long serialVersionUID = 1;
+
+    public ConnectionException(final String msg, final Throwable cause) {
+        super(msg, cause);
+    }
+
+    public ConnectionException(final String msg) {
+        super(msg);
+    }
+
+    public ConnectionException(final Throwable cause) {
+        super(cause);
+    }
+
+    public ConnectionException() {
+        super();
+    }
+}
\ No newline at end of file

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/ConnectionException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/ConnectionException.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/ConnectionException.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/ProtocolHandler.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/ProtocolHandler.java?rev=577545&r1=577544&r2=577545&view=diff
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/ProtocolHandler.java (original)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/ProtocolHandler.java Wed Sep 19 21:55:19 2007
@@ -20,19 +20,11 @@
 package org.apache.geronimo.gshell.remote.transport;
 
 import org.apache.geronimo.gshell.remote.message.Message;
-import org.apache.geronimo.gshell.remote.message.MessageResponseInspector;
 import org.apache.geronimo.gshell.remote.message.MessageVisitor;
-import org.apache.geronimo.gshell.remote.message.WriteStreamMessage;
-import org.apache.geronimo.gshell.remote.stream.SessionInputStream;
-import org.apache.geronimo.gshell.remote.stream.SessionOutputStream;
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.reqres.Request;
-import org.apache.mina.filter.reqres.Response;
 import org.codehaus.plexus.component.annotations.Component;
-import org.codehaus.plexus.component.annotations.Requirement;
-import org.codehaus.plexus.util.IOUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,16 +37,9 @@
 public class ProtocolHandler
     implements IoHandler
 {
-    protected Logger log = LoggerFactory.getLogger(getClass());
-
-    @Requirement
-    protected MessageResponseInspector responseInspector;
-
-    //
-    // TODO: Might be able to get this puppy injected...
-    //
+    private final Logger log = LoggerFactory.getLogger(getClass());
     
-    protected MessageVisitor visitor;
+    private MessageVisitor visitor;
 
     public MessageVisitor getVisitor() {
         return visitor;
@@ -64,199 +49,63 @@
         this.visitor = visitor;
     }
 
-    public MessageResponseInspector getResponseInspector() {
-        return responseInspector;
-    }
-
-    //
-    // Stream Access
-    //
-
-    private void setInputStream(final IoSession session, final SessionInputStream in) {
-        assert session != null;
-        assert in != null;
-
-        Object obj = session.getAttribute(Transport.INPUT_STREAM);
-
-        if (obj != null) {
-            throw new IllegalStateException("Input stream already bound");
-        }
-
-        session.setAttribute(Transport.INPUT_STREAM, in);
-
-        log.debug("Bound input stream: {}", in);
-    }
-
-    private SessionInputStream getInputStream(final IoSession session) {
-        assert session != null;
-
-        SessionInputStream in = (SessionInputStream) session.getAttribute(Transport.INPUT_STREAM);
-
-        if (in == null) {
-            throw new IllegalStateException("Input stream not bound");
-        }
-
-        return in;
-    }
-
-    private SessionInputStream removeInputStream(final IoSession session) {
-        assert session != null;
-
-        return (SessionInputStream) session.removeAttribute(Transport.INPUT_STREAM);
-    }
-
-    private void setOutputStream(final IoSession session, final SessionOutputStream out) {
-        assert session != null;
-        assert out != null;
-
-        Object obj = session.getAttribute(Transport.OUTPUT_STREAM);
-
-        if (obj != null) {
-            throw new IllegalStateException("Output stream already bound");
-        }
-
-        session.setAttribute(Transport.OUTPUT_STREAM, out);
-
-        log.debug("Bound output stream: {}", out);
-    }
-
-    private SessionOutputStream getOutputStream(final IoSession session) {
-        assert session != null;
-
-        SessionOutputStream out = (SessionOutputStream) session.getAttribute(Transport.OUTPUT_STREAM);
-
-        if (out == null) {
-            throw new IllegalStateException("Output stream not bound");
-        }
-
-
-        return out;
-    }
-
-    private SessionOutputStream removeOutputStream(final IoSession session) {
-        assert session != null;
-
-        return (SessionOutputStream) session.removeAttribute(Transport.OUTPUT_STREAM);
-    }
-
     //
-    // IoHandler
+    // TODO: Do we need to stuff the visitor into the session context?
     //
-
-    public void sessionCreated(final IoSession session) throws Exception {
+    
+    public void sessionCreated(IoSession session) throws Exception {
         log.debug("Session created: {}", session);
     }
 
     public void sessionOpened(final IoSession session) throws Exception {
-        assert session != null;
-
         log.debug("Session opened: {}", session);
-
-        //
-        // Once the session has been opened, bind streams to the session context.
-        //
-
-        setInputStream(session, new SessionInputStream());
-        
-        setOutputStream(session, new SessionOutputStream(session));
     }
 
+    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
+        log.debug("Session idle: {}, status: {}", session, status);
+    }
+    
     public void sessionClosed(final IoSession session) throws Exception {
-        assert session != null;
-
         log.debug("Session closed: {}", session);
-
-        IOUtil.close(removeInputStream(session));
-        
-        IOUtil.close(removeOutputStream(session));
-    }
-
-    public void sessionIdle(final IoSession session, final IdleStatus status) throws Exception {
-        assert session != null;
-
-        log.debug("Session idle: {}, status: {}", session, status);
-
-        if (status == IdleStatus.READER_IDLE) {
-            log.warn("Read timeout");
-        }
     }
 
     public void messageReceived(final IoSession session, final Object obj) throws Exception {
-        assert session != null;
-        assert obj != null;
-
-        log.debug("Message received: {}", obj);
-
+        log.debug("Message received: {}, message: {}", session, obj);
+        
         //
         // TODO: Need to handle Exception muck, and send faul messages back to clients
         //
 
         if (obj instanceof Message) {
-            //
             // This is the main protocol action, set the session, freeze the message and
             // then process the message with our visitor
-            //
 
-            Message msg = (Message)obj;
+            final Message msg = (Message)obj;
 
             msg.setSession(session);
             msg.freeze();
 
-            if (msg instanceof WriteStreamMessage) {
-                //
-                // HACK: See if this fucking works...
-                //
-
-                SessionInputStream in = getInputStream(session);
-
-                in.write((WriteStreamMessage)msg);
-            }
-            else if (visitor != null) {
+            //
+            // TODO: Make sure we have a visitor... gotta have it really...
+            //
+            
+            if (visitor != null) {
                 msg.process(visitor);
             }
             else {
                 log.warn("Unable to process message because vistor has not been bound; ignoring");
             }
         }
-        else if (obj instanceof Response) {
-            //
-            // Secondardy is to handle deregistration of request/resposne messages
-            //
-
-            Response resp = (Response)obj;
-
-            Request req = resp.getRequest();
-
-            responseInspector.deregister(req);
-        }
         else {
             log.error("Unhandled message: {}", obj);
         }
     }
 
-    public void messageSent(final IoSession session, final Object obj) throws Exception {
-        assert session != null;
-
-        log.debug("Message sent: {}", obj);
-
-        if (obj instanceof Request) {
-            //
-            // When request messages are sent, we need to register them with the response inspector
-            // so that when a resposne comes back we know how to correlate it with its request.
-            //
-
-            Request req = (Request) obj;
-
-            responseInspector.register(req);
-        }
+    public void messageSent(IoSession session, Object message) throws Exception {
+        log.debug("Message sent: {}, message: {}", session, message);
     }
 
     public void exceptionCaught(final IoSession session, final Throwable cause) throws Exception {
-        assert session != null;
-        assert cause != null;
-
-        log.error("Unhandled error: " + cause, cause);
-
-        session.close();
+        log.error("Exception caught: " + session + ", cause: " + cause, cause);
     }
 }

Modified: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/Transport.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/Transport.java?rev=577545&r1=577544&r2=577545&view=diff
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/Transport.java (original)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/Transport.java Wed Sep 19 21:55:19 2007
@@ -24,6 +24,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.geronimo.gshell.remote.message.Message;
+import org.apache.mina.common.WriteFuture;
 
 /**
  * Provides the client-side protocol interface.
@@ -32,13 +33,7 @@
  */
 public interface Transport
 {
-    String STREAM_BASENAME = "org.apache.geronimo.gshell.remote.stream.";
-
-    String INPUT_STREAM = STREAM_BASENAME + "IN";
-
-    String OUTPUT_STREAM = STREAM_BASENAME + "OUT";
-
-    void send(Message msg) throws Exception;
+    WriteFuture send(Object msg) throws Exception;
 
     Message request(Message msg, long timeout, TimeUnit unit) throws Exception;
 

Copied: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportCommon.java (from r577140, geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportSupport.java)
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportCommon.java?p2=geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportCommon.java&p1=geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportSupport.java&r1=577140&r2=577545&rev=577545&view=diff
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportSupport.java (original)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportCommon.java Wed Sep 19 21:55:19 2007
@@ -19,41 +19,60 @@
 
 package org.apache.geronimo.gshell.remote.transport;
 
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
 import org.apache.geronimo.gshell.common.tostring.ReflectionToStringBuilder;
-import org.apache.geronimo.gshell.remote.logging.LoggingFilter;
+import org.apache.geronimo.gshell.common.tostring.ToStringStyle;
 import org.apache.geronimo.gshell.remote.codec.MessageCodecFactory;
+import org.apache.geronimo.gshell.remote.logging.LoggingFilter;
 import org.apache.geronimo.gshell.remote.message.MessageVisitor;
+import org.apache.geronimo.gshell.remote.request.RequestResponseFilter;
+import org.apache.geronimo.gshell.remote.stream.SessionStreamFilter;
 import org.apache.mina.common.DefaultIoFilterChainBuilder;
 import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoSession;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.reqres.RequestResponseFilter;
+import org.apache.mina.management.IoSessionStat;
+import org.apache.mina.management.StatCollector;
 import org.codehaus.plexus.PlexusContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Support for {@link Transport} and {@link TransportServer} instances.
+ * Common bits for {@link Transport} and {@link TransportServer} implementations.
  *
  * @version $Rev$ $Date$
  */
-public class TransportSupport
+public abstract class TransportCommon
 {
-    public static final String PROTOCOL_FILTER_NAME = "protocol";
+    protected final Logger log = LoggerFactory.getLogger(getClass());
 
-    public static final String REQRESP_FILTER_NAME = "reqresp";
+    // protected final ExecutorService executor;
 
-    protected Logger log = LoggerFactory.getLogger(getClass());
+    // protected final ScheduledExecutorService scheduler;
 
     private PlexusContainer container;
 
+    private IoService service;
+
+    private StatCollector statCollector;
+
     private MessageVisitor messageVisitor;
 
     private ProtocolHandler protocolHandler;
-
+    
     private MessageCodecFactory codecFactory;
 
+    protected TransportCommon() {
+        //
+        // TODO: Add custom thread factory ?
+        //
+
+        // executor = new ThreadPoolExecutor(1, 16, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+
+        // executor = Executors.newCachedThreadPool();
+
+        // scheduler = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1);
+    }
+
     public String toString() {
         return ReflectionToStringBuilder.toString(this);
     }
@@ -61,22 +80,59 @@
     protected void configure(final IoService service) throws Exception {
         assert service != null;
 
+        this.service = service;
+
         ProtocolHandler handler = getProtocolHandler();
+
         MessageVisitor visitor = getMessageVisitor();
+
         handler.setVisitor(visitor);
+        
         service.setHandler(handler);
 
         DefaultIoFilterChainBuilder filterChain = service.getFilterChain();
+        
+        // filterChain.addLast(ExecutorFilter.class.getSimpleName(), new ExecutorFilter(executor));
 
-        if (log.isDebugEnabled()) {
-            filterChain.addLast(LoggingFilter.NAME, new LoggingFilter());
-        }
+        // filterChain.addLast(ProfilerTimerFilter.class.getSimpleName(), new ProfilerTimerFilter());
+
+        //
+        // NOTE: I read that executor might be best *after* protocl...
+        //
+
+        filterChain.addLast(ProtocolCodecFilter.class.getSimpleName(), new ProtocolCodecFilter(getMessageCodecFactory()));
+
+        filterChain.addLast(LoggingFilter.class.getSimpleName(), new LoggingFilter());
+
+        filterChain.addLast(SessionStreamFilter.class.getSimpleName(), new SessionStreamFilter());
+
+        filterChain.addLast(RequestResponseFilter.class.getSimpleName(), new RequestResponseFilter());
+
+        // Setup stat collection
+        // statCollector = new StatCollector(service);
+        // statCollector.start();
 
-        filterChain.addLast(PROTOCOL_FILTER_NAME, new ProtocolCodecFilter(getMessageCodecFactory()));
+        //
+        // TODO: Start up a scheduled task to periodically log stats
+        //
+    }
+
+    private void logStats(final IoSession session) throws Exception {
+        assert session != null;
+
+        IoSessionStat stat = (IoSessionStat) session.getAttribute(StatCollector.KEY);
+
+        if (stat != null) {
+            log.debug("Stats: {}", ReflectionToStringBuilder.toString(stat, ToStringStyle.SHORT_PREFIX_STYLE));
+        }
+    }
+    
+    public void close() {
+        // statCollector.stop();
 
-        ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors());
+        // executor.shutdownNow();
 
-        filterChain.addLast(REQRESP_FILTER_NAME, new RequestResponseFilter(handler.getResponseInspector(), scheduler));
+        // scheduler.shutdownNow();
     }
 
     //
@@ -86,7 +142,7 @@
     public void setMessageVisitor(final MessageVisitor messageVisitor) {
         assert messageVisitor != null;
 
-        log.debug("Using message visitor: {}", messageVisitor);
+        log.trace("Using message visitor: {}", messageVisitor);
 
         this.messageVisitor = messageVisitor;
     }
@@ -102,7 +158,7 @@
     public void setProtocolHandler(final ProtocolHandler protocolHandler) {
         assert protocolHandler != null;
 
-        log.debug("Using protocol handler: {}", protocolHandler);
+        log.trace("Using protocol handler: {}", protocolHandler);
 
         this.protocolHandler = protocolHandler;
     }
@@ -118,7 +174,7 @@
     public void setMessageCodecFactory(final MessageCodecFactory codecFactory) {
         assert codecFactory != null;
 
-        log.debug("Using codec factory: {}", codecFactory);
+        log.trace("Using codec factory: {}", codecFactory);
 
         this.codecFactory = codecFactory;
     }
@@ -136,7 +192,7 @@
     //
 
     public void setContainer(final PlexusContainer container) {
-        log.debug("Using plexus container: {}", container);
+        log.trace("Using plexus container: {}", container);
 
         this.container = container;
     }

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportCommon.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportCommon.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportCommon.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportException.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportException.java?rev=577545&view=auto
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportException.java (added)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportException.java Wed Sep 19 21:55:19 2007
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geronimo.gshell.remote.transport;
+
+/**
+ * ???
+ *
+ * @version $Rev$ $Date$
+ */
+public class TransportException
+    extends RuntimeException
+{
+    private static final long serialVersionUID = 1;
+
+    public TransportException(final String msg, final Throwable cause) {
+        super(msg, cause);
+    }
+
+    public TransportException(final String msg) {
+        super(msg);
+    }
+
+    public TransportException(final Throwable cause) {
+        super(cause);
+    }
+
+    public TransportException() {
+        super();
+    }
+}
\ No newline at end of file

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportException.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportException.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportFactoryLocator.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportFactoryLocator.java?rev=577545&r1=577544&r2=577545&view=diff
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportFactoryLocator.java (original)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/TransportFactoryLocator.java Wed Sep 19 21:55:19 2007
@@ -49,7 +49,7 @@
     }
 
     public static class MissingSchemeException
-        extends Exception
+        extends TransportException
     {
         public MissingSchemeException(final URI location) {
             super("Invalid location; missing scheme: " + location);

Modified: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/ssl/SslTransport.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/ssl/SslTransport.java?rev=577545&r1=577544&r2=577545&view=diff
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/ssl/SslTransport.java (original)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/ssl/SslTransport.java Wed Sep 19 21:55:19 2007
@@ -24,6 +24,7 @@
 import org.apache.geronimo.gshell.remote.ssl.SSLContextFactory;
 import org.apache.geronimo.gshell.remote.transport.tcp.TcpTransport;
 import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.filter.executor.ExecutorFilter;
 import org.apache.mina.filter.ssl.SslFilter;
 
 /**
@@ -48,7 +49,7 @@
         SslFilter sslFilter = new SslFilter(sslContextFactory.createClientContext());
         sslFilter.setUseClientMode(true);
 
-        filterChain.addFirst("ssl", sslFilter);
+        filterChain.addAfter(ExecutorFilter.class.getSimpleName(), SslFilter.class.getSimpleName(), sslFilter);
     }
 
     //

Modified: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/ssl/SslTransportServer.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/ssl/SslTransportServer.java?rev=577545&r1=577544&r2=577545&view=diff
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/ssl/SslTransportServer.java (original)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/ssl/SslTransportServer.java Wed Sep 19 21:55:19 2007
@@ -24,6 +24,7 @@
 import org.apache.geronimo.gshell.remote.ssl.SSLContextFactory;
 import org.apache.geronimo.gshell.remote.transport.tcp.TcpTransportServer;
 import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.filter.executor.ExecutorFilter;
 import org.apache.mina.filter.ssl.SslFilter;
 
 /**
@@ -46,7 +47,8 @@
         DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
 
         SslFilter sslFilter = new SslFilter(sslContextFactory.createServerContext());
-        filterChain.addFirst("ssl", sslFilter);
+
+        filterChain.addAfter(ExecutorFilter.class.getSimpleName(), SslFilter.class.getSimpleName(), sslFilter);
     }
     
     //

Modified: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/tcp/TcpTransport.java?rev=577545&r1=577544&r2=577545&view=diff
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/tcp/TcpTransport.java (original)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/tcp/TcpTransport.java Wed Sep 19 21:55:19 2007
@@ -19,26 +19,36 @@
 
 package org.apache.geronimo.gshell.remote.transport.tcp;
 
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.net.URI;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.geronimo.gshell.common.tostring.ReflectionToStringBuilder;
+import org.apache.geronimo.gshell.common.tostring.ToStringStyle;
 import org.apache.geronimo.gshell.remote.message.Message;
 import org.apache.geronimo.gshell.remote.message.MessageVisitor;
+import org.apache.geronimo.gshell.remote.request.Requestor;
+import org.apache.geronimo.gshell.remote.stream.SessionInputStream;
+import org.apache.geronimo.gshell.remote.stream.SessionOutputStream;
+import org.apache.geronimo.gshell.remote.transport.ConnectionException;
 import org.apache.geronimo.gshell.remote.transport.Transport;
-import org.apache.geronimo.gshell.remote.transport.TransportSupport;
+import org.apache.geronimo.gshell.remote.transport.TransportCommon;
 import org.apache.mina.common.CloseFuture;
 import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceListener;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
 import org.apache.mina.common.WriteFuture;
-import org.apache.mina.filter.reqres.Request;
-import org.apache.mina.filter.reqres.Response;
+import org.apache.mina.transport.socket.SocketSessionConfig;
 import org.apache.mina.transport.socket.nio.SocketConnector;
 
 /**
@@ -47,25 +57,36 @@
  * @version $Rev$ $Date$
  */
 public class TcpTransport
-    extends TransportSupport
+    extends TransportCommon
     implements Transport
 {
-    private static final int CONNECT_TIMEOUT = 3000;
+    private static final int CONNECT_TIMEOUT = 5;
 
     protected final URI remoteLocation;
 
-    protected final InetSocketAddress remoteAddress;
+    protected final SocketAddress remoteAddress;
 
     protected final URI localLocation;
 
-    protected final InetSocketAddress localAddress;
+    protected final SocketAddress localAddress;
 
-    protected SocketConnector connector;
+    protected IoConnector connector;
 
     protected IoSession session;
 
     protected boolean connected;
 
+    protected TcpTransport(final URI remoteLocation, final SocketAddress remoteAddress, final URI localLocation, final SocketAddress localAddress) throws Exception {
+        assert remoteLocation != null;
+        assert remoteAddress != null;
+
+        this.remoteLocation = remoteLocation;
+        this.remoteAddress = remoteAddress;
+
+        this.localLocation = localLocation;
+        this.localAddress = localAddress;
+    }
+
     public TcpTransport(final URI remote, final URI local) throws Exception {
         assert remote != null;
         // local may be null
@@ -84,11 +105,40 @@
         }
     }
 
+    protected IoConnector createConnector() throws Exception {
+        SocketConnector connector = new SocketConnector(/*Runtime.getRuntime().availableProcessors() + 1*/ 4, /* executor */ Executors.newCachedThreadPool());
+
+        SocketSessionConfig config = connector.getSessionConfig();
+
+        // config.setTcpNoDelay(true);
+        // config.setKeepAlive(true);
+
+        return connector;
+    }
+
     protected synchronized void init() throws Exception {
-        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
+        connector = createConnector();
 
-        connector = new SocketConnector(Runtime.getRuntime().availableProcessors(), executor);
-        connector.setConnectTimeout(30);
+        connector.addListener(new IoServiceListener() {
+            public void serviceActivated(IoService service) {
+                log.info("Service activated: {}", service);
+            }
+
+            public void serviceDeactivated(IoService service) {
+                log.info("Service deactivated: {}", service);
+            }
+
+            public void sessionCreated(IoSession session) {
+                log.info("Session created: {}", session);
+            }
+
+            public void sessionDestroyed(IoSession session) {
+                log.info("Session destroyed: {}", session);
+            }
+        });
+
+
+        // connector.setConnectTimeout(30);
 
         //
         // HACK: Need to manually wire in the visitor impl for now... :-(
@@ -97,6 +147,18 @@
         setMessageVisitor((MessageVisitor) getContainer().lookup(MessageVisitor.class, "client"));
         
         configure(connector);
+
+        DefaultIoFilterChainBuilder filterChain = connector.getFilterChain();
+
+        log.debug("Service filters:");
+
+        for (IoFilterChain.Entry entry : filterChain.getAll()) {
+            log.debug("    {}", entry);
+        }
+
+        IoSessionConfig config = connector.getSessionConfig();
+
+        log.debug("Session config: {}", ReflectionToStringBuilder.toString(config, ToStringStyle.MULTI_LINE_STYLE));
     }
 
     public synchronized void connect() throws Exception {
@@ -110,11 +172,11 @@
 
         ConnectFuture cf = connector.connect(remoteAddress, localAddress);
 
-        if (cf.awaitUninterruptibly(CONNECT_TIMEOUT)) {
+        if (cf.awaitUninterruptibly(CONNECT_TIMEOUT, TimeUnit.SECONDS)) {
              session = cf.getSession();
         }
         else {
-            throw new Exception("Failed to connect");
+            throw new ConnectionException("Failed to connect in allocated time");
         }
 
         connected = true;
@@ -123,9 +185,16 @@
     }
 
     public synchronized void close() {
-        CloseFuture cf = session.close();
+        log.info("Closing");
+
+        try {
+            CloseFuture cf = session.close();
 
-        cf.awaitUninterruptibly();
+            cf.awaitUninterruptibly();
+        }
+        finally {
+            super.close();
+        }
 
         log.info("Closed");
     }
@@ -138,45 +207,33 @@
         return localLocation;
     }
 
-    private void doSend(final Object msg) throws Exception {
+    public WriteFuture send(final Object msg) throws Exception {
         assert msg != null;
 
-        WriteFuture wf = session.write(msg);
-
-        wf.awaitUninterruptibly();
-
-        if (!wf.isWritten()) {
-            throw new IOException("Session did not fully write the message");
-        }
+        return session.write(msg);
     }
 
-    public void send(final Message msg) throws Exception {
+    public Message request(final Message msg) throws Exception {
         assert msg != null;
 
-        doSend(msg);
-    }
+        Requestor requestor = new Requestor(this);
 
-    public Message request(final Message msg) throws Exception {
-        return request(msg, 5, TimeUnit.SECONDS);
+        return requestor.request(msg);
     }
     
     public Message request(final Message msg, final long timeout, final TimeUnit unit) throws Exception {
         assert msg != null;
 
-        Request req = new Request(msg.getId(), msg, timeout, unit);
+        Requestor requestor = new Requestor(this);
 
-        doSend(req);
-
-        Response resp = req.awaitResponse();
-
-        return (Message) resp.getMessage();
+        return requestor.request(msg, timeout, unit);
     }
-
+    
     public InputStream getInputStream() {
-        return (InputStream) session.getAttribute(Transport.INPUT_STREAM);
+        return SessionInputStream.lookup(session);
     }
 
     public OutputStream getOutputStream() {
-        return (OutputStream) session.getAttribute(Transport.OUTPUT_STREAM);
+        return SessionOutputStream.lookup(session);
     }
 }

Modified: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/tcp/TcpTransportServer.java?rev=577545&r1=577544&r2=577545&view=diff
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/tcp/TcpTransportServer.java (original)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/tcp/TcpTransportServer.java Wed Sep 19 21:55:19 2007
@@ -21,15 +21,24 @@
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.net.URI;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import org.apache.geronimo.gshell.common.tostring.ReflectionToStringBuilder;
+import org.apache.geronimo.gshell.common.tostring.ToStringStyle;
 import org.apache.geronimo.gshell.remote.message.MessageVisitor;
 import org.apache.geronimo.gshell.remote.security.SecurityFilter;
+import org.apache.geronimo.gshell.remote.transport.TransportCommon;
 import org.apache.geronimo.gshell.remote.transport.TransportServer;
-import org.apache.geronimo.gshell.remote.transport.TransportSupport;
 import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceListener;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.transport.socket.SocketSessionConfig;
 import org.apache.mina.transport.socket.nio.SocketAcceptor;
 
 /**
@@ -38,24 +47,29 @@
  * @version $Rev$ $Date$
  */
 public class TcpTransportServer
-    extends TransportSupport
+    extends TransportCommon
     implements TransportServer
 {
     protected final URI location;
 
-    protected final InetSocketAddress address;
+    protected final SocketAddress address;
 
-    protected SocketAcceptor acceptor;
+    protected IoAcceptor acceptor;
 
     protected boolean bound;
 
     private SecurityFilter securityFilter;
-    
-    public TcpTransportServer(final URI location) throws Exception {
+
+    protected TcpTransportServer(final URI location, final SocketAddress address) {
         assert location != null;
+        assert address != null;
 
         this.location = location;
-        this.address = new InetSocketAddress(InetAddress.getByName(location.getHost()), location.getPort());
+        this.address = address;
+    }
+
+    public TcpTransportServer(final URI location) throws Exception {
+        this(location, new InetSocketAddress(InetAddress.getByName(location.getHost()), location.getPort()));
     }
 
     public URI getLocation() {
@@ -78,11 +92,41 @@
         return securityFilter;
     }
 
+    protected IoAcceptor createAcceptor() throws Exception {
+        SocketAcceptor acceptor = new SocketAcceptor(/*Runtime.getRuntime().availableProcessors() + 1*/ 4, /* executor */ Executors.newCachedThreadPool());
+
+        SocketSessionConfig config = acceptor.getSessionConfig();
+
+        // ????
+        config.setTcpNoDelay(true);
+        config.setKeepAlive(true);
+
+        return acceptor;
+    }
+
     protected synchronized void init() throws Exception {
-        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
-        acceptor = new SocketAcceptor(Runtime.getRuntime().availableProcessors(), executor);
+        acceptor = createAcceptor();
+
         acceptor.setLocalAddress(address);
 
+        acceptor.addListener(new IoServiceListener() {
+            public void serviceActivated(IoService service) {
+                log.info("Service activated: {}", service);
+            }
+
+            public void serviceDeactivated(IoService service) {
+                log.info("Service deactivated: {}", service);
+            }
+
+            public void sessionCreated(IoSession session) {
+                log.info("Session created: {}", session);
+            }
+
+            public void sessionDestroyed(IoSession session) {
+                log.info("Session destroyed: {}", session);
+            }
+        });
+
         //
         // HACK: Need to manually wire in the visitor impl for now... :-(
         //
@@ -92,9 +136,19 @@
         configure(acceptor);
 
         DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
+        
+        filterChain.addLast(SecurityFilter.class.getSimpleName(), getSecurityFilter());
+
+        log.debug("Service filters:");
+        
+        for (IoFilterChain.Entry entry : filterChain.getAll()) {
+            log.debug("    {}", entry);
+        }
+
+        IoSessionConfig config = acceptor.getSessionConfig();
+
+        log.debug("Session config: {}", ReflectionToStringBuilder.toString(config, ToStringStyle.MULTI_LINE_STYLE));
 
-        // Install the authentication filter right after the protocol filter
-        filterChain.addAfter(PROTOCOL_FILTER_NAME, SecurityFilter.NAME, getSecurityFilter());
     }
 
     public synchronized void bind() throws Exception {
@@ -112,6 +166,8 @@
     }
 
     public synchronized void close() {
+        super.close();
+
         acceptor.unbind();
 
         log.info("Closed");

Modified: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/vm/VmTransport.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/vm/VmTransport.java?rev=577545&r1=577544&r2=577545&view=diff
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/vm/VmTransport.java (original)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/transport/vm/VmTransport.java Wed Sep 19 21:55:19 2007
@@ -19,22 +19,10 @@
 
 package org.apache.geronimo.gshell.remote.transport.vm;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.URI;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.geronimo.gshell.remote.message.Message;
-import org.apache.geronimo.gshell.remote.message.MessageVisitor;
-import org.apache.geronimo.gshell.remote.transport.Transport;
-import org.apache.geronimo.gshell.remote.transport.TransportSupport;
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteFuture;
-import org.apache.mina.filter.reqres.Request;
-import org.apache.mina.filter.reqres.Response;
+import org.apache.geronimo.gshell.remote.transport.tcp.TcpTransport;
+import org.apache.mina.common.IoConnector;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.mina.transport.vmpipe.VmPipeConnector;
 
@@ -44,133 +32,13 @@
  * @version $Rev$ $Date$
  */
 public class VmTransport
-    extends TransportSupport
-    implements Transport
+    extends TcpTransport
 {
-    private static final int CONNECT_TIMEOUT = 3000;
-    
-    protected final URI remoteLocation;
-
-    protected final VmPipeAddress remoteAddress;
-
-    protected final URI localLocation;
-
-    protected final VmPipeAddress localAddress;
-
-    protected VmPipeConnector connector;
-
-    protected IoSession session;
-
-    protected boolean connected;
-
     public VmTransport(final URI remote, final URI local) throws Exception {
-        assert remote != null;
-        // local may be null
-
-        this.remoteLocation = remote;
-        this.remoteAddress = new VmPipeAddress(remote.getPort());
-
-        if (local != null) {
-            this.localLocation = local;
-            this.localAddress = new VmPipeAddress(local.getPort());
-        }
-        else {
-            // These are final, so make sure to mark them null if we have no local address
-            this.localLocation = null;
-            this.localAddress = null;
-        }
-    }
-
-    protected synchronized void init() throws Exception {
-        connector = new VmPipeConnector();
-
-        //
-        // HACK: Need to manually wire in the visitor impl for now... :-(
-        //
-
-        setMessageVisitor((MessageVisitor) getContainer().lookup(MessageVisitor.class, "client"));
-
-        configure(connector);
-    }
-
-    public synchronized void connect() throws Exception {
-        if (connected) {
-            throw new IllegalStateException("Already connected");
-        }
-
-        init();
-
-        log.info("Connecting to: {}", remoteAddress);
-
-        ConnectFuture cf = connector.connect(remoteAddress, localAddress);
-
-        if (cf.awaitUninterruptibly(CONNECT_TIMEOUT)) {
-             session = cf.getSession();
-        }
-        else {
-            throw new Exception("Failed to connect");
-        }
-
-        connected = true;
-
-        log.info("Connected");
-    }
-
-    public synchronized void close() {
-        CloseFuture cf = session.close();
-
-        cf.awaitUninterruptibly();
-
-        log.info("Closed");
-    }
-
-    public URI getRemoteLocation() {
-        return remoteLocation;
-    }
-
-    public URI getLocalLocation() {
-        return localLocation;
-    }
-
-    private void doSend(final Object msg) throws Exception {
-        assert msg != null;
-
-        WriteFuture wf = session.write(msg);
-
-        wf.awaitUninterruptibly();
-
-        if (!wf.isWritten()) {
-            throw new IOException("Session did not fully write the message");
-        }
-    }
-
-    public void send(final Message msg) throws Exception {
-        assert msg != null;
-
-        doSend(msg);
-    }
-
-    public Message request(final Message msg) throws Exception {
-        return request(msg, 5, TimeUnit.SECONDS);
-    }
-
-    public Message request(final Message msg, final long timeout, final TimeUnit unit) throws Exception {
-        assert msg != null;
-
-        Request req = new Request(msg.getId(), msg, timeout, unit);
-
-        doSend(req);
-
-        Response resp = req.awaitResponse();
-
-        return (Message) resp.getMessage();
-    }
-
-    public InputStream getInputStream() {
-        return (InputStream) session.getAttribute(Transport.INPUT_STREAM);
+        super(remote, new VmPipeAddress(remote.getPort()), local, local != null ? new VmPipeAddress(local.getPort()) : null);
     }
 
-    public OutputStream getOutputStream() {
-        return (OutputStream) session.getAttribute(Transport.OUTPUT_STREAM);
+    protected IoConnector createConnector() throws Exception {
+        return new VmPipeConnector();
     }
 }



Mime
View raw message