activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r961195 - in /activemq/sandbox/activemq-apollo-actor: activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ activemq-store/src/main/scala/org/apache/activemq/apollo/store/ activemq-tcp/src/main/java/org/apache/activemq/transport/t...
Date Wed, 07 Jul 2010 04:17:06 GMT
Author: chirino
Date: Wed Jul  7 04:17:05 2010
New Revision: 961195

URL: http://svn.apache.org/viewvc?rev=961195&view=rev
Log:
In preperation of doing fancier buffer management, changed the abstraction level of a wire format so that it's in charge of all buffer management on the transport's channels.

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/DirectRecord.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/Stomp.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/Stomp.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/Stomp.java?rev=961195&r1=961194&r2=961195&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/Stomp.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/Stomp.java Wed Jul  7 04:17:05 2010
@@ -33,6 +33,7 @@ public interface Stomp {
     AsciiBuffer FALSE = new AsciiBuffer("false");
 
     public static interface Commands {
+        AsciiBuffer STOMP = new AsciiBuffer("STOMP");
         AsciiBuffer CONNECT = new AsciiBuffer("CONNECT");
         AsciiBuffer SEND = new AsciiBuffer("SEND");
         AsciiBuffer DISCONNECT = new AsciiBuffer("DISCONNECT");

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961195&r1=961194&r2=961195&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul  7 04:17:05 2010
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.apollo.stomp
 
-import _root_.java.io.{DataOutput, DataInput, IOException}
 import _root_.org.apache.activemq.apollo.broker._
 
 import _root_.org.apache.activemq.wireformat.{WireFormatFactory, WireFormat}
@@ -30,7 +29,9 @@ import Stomp.Headers._
 import BufferConversions._
 import _root_.scala.collection.JavaConversions._
 import StompFrameConstants._
-
+import java.io.{EOFException, DataOutput, DataInput, IOException}
+import org.apache.activemq.broker.store.DirectRecordStore
+import java.nio.channels.{SocketChannel, WritableByteChannel, ReadableByteChannel}
 
 /**
  * Creates WireFormat objects that marshalls the <a href="http://activemq.apache.org/stomp/">Stomp</a> protocol.
@@ -69,6 +70,8 @@ class StompWireFormat extends WireFormat
   import StompWireFormat._
   override protected def log: Log = StompWireFormat
 
+  var directRecordStore:DirectRecordStore = null
+
   implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
   implicit def wrap(x: Byte) = {
     ByteBuffer.wrap(Array(x));
@@ -87,8 +90,8 @@ class StompWireFormat extends WireFormat
   }
 
   def unmarshal(packet:Buffer):AnyRef = {
-    start = packet.offset
-    end = packet.offset
+    read_start = packet.offset
+    read_end = packet.offset
     val bb = packet.toByteBuffer
     bb.position(packet.offset + packet.length)
     unmarshalNB(bb)
@@ -120,7 +123,7 @@ class StompWireFormat extends WireFormat
       val buffer1 = frame.headers.head._1;
       val buffer2 = frame.content;
       val length = (buffer2.offset-buffer1.offset)+buffer2.length
-      os.write( buffer1.data, offset, length)
+      os.write( buffer1.data, offset, length)                                                                                            
 
     } else {
       for( (key, value) <- frame.headers ) {
@@ -137,27 +140,163 @@ class StompWireFormat extends WireFormat
 
   def getName() = "stomp"
 
+  
+  /////////////////////////////////////////////////////////////////////
   //
-  // state associated with un-marshalling stomp frames from
-  // with the  unmarshalNB method.
+  // Non blocking write imp
   //
+  /////////////////////////////////////////////////////////////////////
+
+  var write_buffer_size = 1024*64;
+  var write_counter = 0L
+  var write_channel:WritableByteChannel = null
+
+  var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size)
+  var write_buffer = ByteBuffer.allocate(0)
+
+  def is_full = next_write_buffer.size() >= (write_buffer_size >> 2)
+  def is_empty = write_buffer.remaining() == 0
+
+  def setWritableByteChannel(channel: WritableByteChannel) = {
+    this.write_channel = channel
+    if( this.write_channel.isInstanceOf[SocketChannel] ) {
+      this.write_channel.asInstanceOf[SocketChannel].socket().setSendBufferSize(write_buffer_size);
+    }
+  }
+
+  def getWriteCounter = write_counter
+
+
+  def write(command: Any):BufferState =  {
+    if ( is_full) {
+      WireFormat.BufferState.FULL
+    } else {
+      val was_empty = is_empty
+      marshal(command, next_write_buffer);
+      if( was_empty ) {
+        WireFormat.BufferState.WAS_EMPTY
+      } else {
+        WireFormat.BufferState.NOT_EMPTY
+      }
+    }
+  }
+
+  def flush():BufferState = {
+    
+    // if we have a pending write that is being sent over the socket...
+    if ( !is_empty ) {
+        write_counter += write_channel.write(write_buffer)
+    }
+
+    // if it is now empty try to refill...
+    if ( is_empty && next_write_buffer.size()!=0 ) {
+        // size of next buffer is based on how much was used in the previous buffer.
+        val prev_size = Math.min(Math.max(write_buffer.position()+512, 512), write_buffer_size)
+        write_buffer = next_write_buffer.toBuffer().toByteBuffer()
+        next_write_buffer = new DataByteArrayOutputStream(prev_size)
+    }
+
+    if ( is_empty ) {
+      WireFormat.BufferState.EMPTY
+    } else {
+      WireFormat.BufferState.NOT_EMPTY
+    }
+
+  }
+
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Non blocking read impl 
+  //
+  /////////////////////////////////////////////////////////////////////
+  
   type FrameReader = (ByteBuffer)=>StompFrame
 
+  var read_counter = 0L
+  var read_buffer_size = 1024*64
+  var read_channel:ReadableByteChannel = null
+
+  var read_buffer = ByteBuffer.allocate(read_buffer_size)
+  var read_end = 0
+  var read_start = 0
   var next_action:FrameReader = read_action
-  var end = 0
-  var start = 0
 
-  def unmarshalStartPos() = start
-  def unmarshalStartPos(pos:Int):Unit = {start=pos}
+  def setReadableByteChannel(channel: ReadableByteChannel) = {
+    this.read_channel = channel
+    if( this.read_channel.isInstanceOf[SocketChannel] ) {
+      this.read_channel.asInstanceOf[SocketChannel].socket().setReceiveBufferSize(read_buffer_size);
+    }
+  }
+
+  def unread(buffer: Buffer) = {
+    assert(read_counter == 0)
+    read_buffer.put(buffer.data, buffer.offset, buffer.length)
+    read_counter += buffer.length
+  }
+
+  def getReadCounter = read_counter
+
+  override def read():Object = {
+
+    var command:Object = null
+    while( command==null ) {
+      // do we need to read in more data???
+      if (read_end == read_buffer.position()) {
+
+          // do we need a new data buffer to read data into??
+          if (read_buffer.remaining() == 0) {
+
+              // How much data is still not consumed by the wireformat
+              var size = read_end - read_start
+
+              var new_capacity = if(read_start == 0) {
+                size+read_buffer_size
+              } else {
+                if (size > read_buffer_size) {
+                  size+read_buffer_size
+                } else {
+                  read_buffer_size
+                }
+              }
+
+              var new_buffer = new Array[Byte](new_capacity)
+
+              if (size > 0) {
+                  System.arraycopy(read_buffer.array(), read_start, new_buffer, 0, size)
+              }
+
+              read_buffer = ByteBuffer.wrap(new_buffer)
+              read_buffer.position(size)
+              read_start = 0
+              read_end = size
+          }
+
+          // Try to fill the buffer with data from the socket..
+          var p = read_buffer.position()
+          var count = read_channel.read(read_buffer)
+          if (count == -1) {
+              throw new EOFException("Peer disconnected")
+          } else if (count == 0) {
+              return null
+          }
+          read_counter += count
+      }
 
-  def unmarshalEndPos() = end
-  def unmarshalEndPos(pos:Int):Unit = { end = pos }
+      command = next_action(read_buffer)
+
+      // Sanity checks to make sure the wireformat is behaving as expected.
+      assert(read_start <= read_end)
+      assert(read_end <= read_buffer.position())
+    }
+    return command
+  }
 
   def unmarshalNB(buffer:ByteBuffer):Object = {
     // keep running the next action until
     // a frame is decoded or we run out of input
     var rc:StompFrame = null
-    while( rc == null && end!=buffer.position ) {
+    while( rc == null && read_end!=buffer.position ) {
       rc = next_action(buffer)
     }
 
@@ -167,19 +306,19 @@ class StompWireFormat extends WireFormat
 
   def read_line(buffer:ByteBuffer, maxLength:Int, errorMessage:String):Buffer = {
       val read_limit = buffer.position
-      while( end < read_limit ) {
-        if( buffer.array()(end) =='\n') {
-          var rc = new Buffer(buffer.array, start, end-start)
-          end += 1;
-          start = end;
+      while( read_end < read_limit ) {
+        if( buffer.array()(read_end) =='\n') {
+          var rc = new Buffer(buffer.array, read_start, read_end-read_start)
+          read_end += 1
+          read_start = read_end
           return rc
         }
-        if (SIZE_CHECK && end-start > maxLength) {
-            throw new IOException(errorMessage);
+        if (SIZE_CHECK && read_end-read_start > maxLength) {
+            throw new IOException(errorMessage)
         }
-        end += 1;
+        read_end += 1
       }
-      return null;
+      return null
   }
 
   def read_action:FrameReader = (buffer)=> {
@@ -187,7 +326,7 @@ class StompWireFormat extends WireFormat
     if( line !=null ) {
       var action = line
       if( TRIM ) {
-          action = action.trim();
+          action = action.trim()
       }
       if (action.length() > 0) {
           next_action = read_headers(action)
@@ -202,43 +341,43 @@ class StompWireFormat extends WireFormat
       if( line.trim().length() > 0 ) {
 
         if (SIZE_CHECK && headers.size > MAX_HEADERS) {
-            throw new IOException("The maximum number of headers was exceeded");
+            throw new IOException("The maximum number of headers was exceeded")
         }
 
         try {
-            val seperatorIndex = line.indexOf(SEPERATOR);
+            val seperatorIndex = line.indexOf(SEPERATOR)
             if( seperatorIndex<0 ) {
-                throw new IOException("Header line missing seperator [" + ascii(line) + "]");
+                throw new IOException("Header line missing seperator [" + ascii(line) + "]")
             }
-            var name = line.slice(0, seperatorIndex);
+            var name = line.slice(0, seperatorIndex)
             if( TRIM ) {
-                name = name.trim();
+                name = name.trim()
             }
-            var value = line.slice(seperatorIndex + 1, line.length());
+            var value = line.slice(seperatorIndex + 1, line.length())
             if( TRIM ) {
-                value = value.trim();
+                value = value.trim()
             }
             headers.add((ascii(name), ascii(value)))
         } catch {
             case e:Exception=>
               e.printStackTrace
-              throw new IOException("Unable to parser header line [" + line + "]");
+              throw new IOException("Unable to parser header line [" + line + "]")
         }
 
       } else {
         val contentLength = get(headers, CONTENT_LENGTH)
         if (contentLength.isDefined) {
           // Bless the client, he's telling us how much data to read in.
-          var length=0;
+          var length=0
           try {
-              length = Integer.parseInt(contentLength.get.trim().toString());
+              length = Integer.parseInt(contentLength.get.trim().toString())
           } catch {
             case e:NumberFormatException=>
-              throw new IOException("Specified content-length is not a valid integer");
+              throw new IOException("Specified content-length is not a valid integer")
           }
 
           if (SIZE_CHECK && length > MAX_DATA_LENGTH) {
-              throw new IOException("The maximum data length was exceeded");
+              throw new IOException("The maximum data length was exceeded")
           }
           next_action = read_binary_body(action, headers, length)
 
@@ -275,32 +414,32 @@ class StompWireFormat extends WireFormat
 
   def read_content(buffer:ByteBuffer, contentLength:Int):Buffer = {
       val read_limit = buffer.position
-      if( (read_limit-start) < contentLength+1 ) {
-        end = read_limit;
+      if( (read_limit-read_start) < contentLength+1 ) {
+        read_end = read_limit
         null
       } else {
-        if( buffer.array()(start+contentLength)!= 0 ) {
-           throw new IOException("Exected null termintor after "+contentLength+" content bytes");
+        if( buffer.array()(read_start+contentLength)!= 0 ) {
+           throw new IOException("Exected null termintor after "+contentLength+" content bytes")
         }
-        var rc = new Buffer(buffer.array, start, contentLength)
-        end = start+contentLength+1;
-        start = end;
-        rc;
+        var rc = new Buffer(buffer.array, read_start, contentLength)
+        read_end = read_start+contentLength+1
+        read_start = read_end
+        rc
       }
   }
 
   def read_to_null(buffer:ByteBuffer):Buffer = {
       val read_limit = buffer.position
-      while( end < read_limit ) {
-        if( buffer.array()(end) ==0) {
-          var rc = new Buffer(buffer.array, start, end-start)
-          end += 1;
-          start = end;
-          return rc;
+      while( read_end < read_limit ) {
+        if( buffer.array()(read_end) ==0) {
+          var rc = new Buffer(buffer.array, read_start, read_end-read_start)
+          read_end += 1
+          read_start = read_end
+          return rc
         }
-        end += 1;
+        read_end += 1
       }
-      return null;
+      return null
   }
 
 

Added: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/DirectRecord.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/DirectRecord.java?rev=961195&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/DirectRecord.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/DirectRecord.java Wed Jul  7 04:17:05 2010
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.store;
+
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtbuf.Buffer;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A memory mapped record.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class DirectRecord {
+
+    public long key = -1;
+    public ByteBuffer buffer;
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961195&r1=961194&r2=961195&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul  7 04:17:05 2010
@@ -47,8 +47,6 @@ public class TcpTransport extends BaseSe
     private static final Log LOG = LogFactory.getLog(TcpTransport.class);
 
     private Map<String, Object> socketOptions;
-    private long writeCounter;
-    private long readCounter;
 
     abstract static class SocketState {
         void onStop(Runnable onCompleted) {
@@ -177,12 +175,7 @@ public class TcpTransport extends BaseSe
     private DispatchSource readSource;
     private DispatchSource writeSource;
 
-    int bufferSize = 1024*64;
-
-    DataByteArrayOutputStream next_outbound_buffer;
-    ByteBuffer outbound_buffer;
     protected boolean useLocalHost = true;
-    ByteBuffer readBuffer = ByteBuffer.allocate(bufferSize);
     boolean full = false;
 
     private final Runnable CANCEL_HANDLER = new Runnable() {
@@ -203,6 +196,12 @@ public class TcpTransport extends BaseSe
 
     public void connected(SocketChannel channel) throws IOException {
         this.channel = channel;
+
+        if( wireformat!=null ) {
+            wireformat.setReadableByteChannel(this.channel);
+            wireformat.setWritableByteChannel(this.channel);
+        }
+
         this.channel.configureBlocking(false);
         this.remoteAddress = channel.socket().getRemoteSocketAddress().toString();
         this.socketState = new CONNECTED();
@@ -214,6 +213,11 @@ public class TcpTransport extends BaseSe
         this.remoteLocation = remoteLocation;
         this.localLocation = localLocation;
 
+        if( wireformat!=null ) {
+            wireformat.setReadableByteChannel(this.channel);
+            wireformat.setWritableByteChannel(this.channel);
+        }
+
         if (localLocation != null) {
             InetSocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
             channel.socket().bind(localAddress);
@@ -298,12 +302,6 @@ public class TcpTransport extends BaseSe
 
     private void onConnected() throws SocketException {
 
-        channel.socket().setSendBufferSize(bufferSize);
-        channel.socket().setReceiveBufferSize(bufferSize);
-
-        next_outbound_buffer = new DataByteArrayOutputStream(bufferSize);
-        outbound_buffer = ByteBuffer.allocate(0);
-
         readSource = Dispatch.createSource(channel, SelectionKey.OP_READ, dispatchQueue);
         writeSource = Dispatch.createSource(channel, SelectionKey.OP_WRITE, dispatchQueue);
 
@@ -312,22 +310,12 @@ public class TcpTransport extends BaseSe
 
         readSource.setEventHandler(new Runnable() {
             public void run() {
-                try {
-                    drainInbound();
-                } catch (IOException e) {
-                    onTransportFailure(e);
-                }
+                drainInbound();
             }
         });
         writeSource.setEventHandler(new Runnable() {
             public void run() {
-                if (getServiceState() == STARTED) {
-                    // once the outbound is drained.. we can suspend getting
-                    // write events.
-                    if (drainOutbound()) {
-                        writeSource.suspend();
-                    }
-                }
+                drainOutbound();
             }
         });
 
@@ -350,8 +338,6 @@ public class TcpTransport extends BaseSe
         }
         
         dispatchQueue.release();
-        next_outbound_buffer = null;
-        outbound_buffer = null;
         this.wireformat = null;
     }
 
@@ -374,132 +360,64 @@ public class TcpTransport extends BaseSe
             if (getServiceState() != STARTED) {
                 throw new IOException("Not running.");
             }
+
+            WireFormat.BufferState rc = wireformat.write(command);
+            switch (rc ) {
+                case FULL:
+                    return false;
+                case WAS_EMPTY:
+                    writeSource.resume();
+                default:
+                    return true;
+            }
         } catch (IOException e) {
             onTransportFailure(e);
             return false;
         }
 
-        if ( full ) {
-            return false;
-        } else {
-            try {
-                wireformat.marshal(command, next_outbound_buffer);
-                if( next_outbound_buffer.size() >= bufferSize>>2 ) {
-                    full  = true;
-                }
-            } catch (IOException e) {
-                onTransportFailure(e);
-                return false;
-            }
-            if ( outbound_buffer.remaining()==0 ) {
-                writeSource.resume();
-            }
-            return true;
-        }
-
     }
 
     /**
      * @retruns true if there are no in progress writes.
      */
-    private boolean drainOutbound() {
+    private void drainOutbound() {
         assert Dispatch.getCurrentQueue() == dispatchQueue;
+        if (getServiceState() != STARTED || !socketState.is(CONNECTED.class)) {
+            return;
+        }
         try {
-
-            while (socketState.is(CONNECTED.class) ) {
-
-                // if we have a pending write that is being sent over the socket...
-                int remaining = outbound_buffer.remaining();
-                if (remaining!=0) {
-                    channel.write(outbound_buffer);
-                    int count = remaining - outbound_buffer.remaining();
-                    writeCounter += count;
-                    if (outbound_buffer.remaining() != 0) {
-                        return false;
-                    }
-                } else {
-                    if( next_outbound_buffer.size()!=0) {
-                        // size of next buffer is based on how much was used in the previous buffer.
-                        int prev_size = Math.min(Math.max(outbound_buffer.position()+512, 512), bufferSize);
-                        outbound_buffer = next_outbound_buffer.toBuffer().toByteBuffer();
-                        next_outbound_buffer = new DataByteArrayOutputStream(prev_size);
-                    } else {
-                        if( full ) {
-                            full = false;
-                            listener.onRefill();
-                            // If the listener did not have anything for us...
-                            if (next_outbound_buffer.size() == 0) {
-                                // the source is now drained...
-                                return true;
-                            }
-                        } else {
-                            return true;
-                        }
-                    }
-                }
+            if( wireformat.flush() == WireFormat.BufferState.EMPTY ) {
+                writeSource.suspend();
+                listener.onRefill();
             }
-
         } catch (IOException e) {
             onTransportFailure(e);
-            return true;
         }
-        return outbound_buffer.remaining() == 0;
     }
 
-    private void drainInbound() throws IOException {
+    private void drainInbound() {
         if (!getServiceState().isStarted() || readSource.isSuspended()) {
             return;
         }
-        while (true) {
-
-            // do we need to read in more data???
-            if (this.wireformat.unmarshalEndPos() == readBuffer.position()) {
-
-                // do we need a new data buffer to read data into??
-                if (readBuffer.remaining() == 0) {
-
-                    // How much data is still not consumed by the wireformat
-                    int size = this.wireformat.unmarshalEndPos() - this.wireformat.unmarshalStartPos();
-
-                    int new_capacity = this.wireformat.unmarshalStartPos() == 0 ? size+bufferSize : (size > bufferSize ? size+bufferSize : bufferSize);
-                    byte[] new_buffer = new byte[new_capacity];
-
-                    if (size > 0) {
-                        System.arraycopy(readBuffer.array(), this.wireformat.unmarshalStartPos(), new_buffer, 0, size);
-                    }
-
-                    readBuffer = ByteBuffer.wrap(new_buffer);
-                    readBuffer.position(size);
-                    this.wireformat.unmarshalStartPos(0);
-                    this.wireformat.unmarshalEndPos(size);
-                }
-
-                // Try to fill the buffer with data from the socket..
-                int p = readBuffer.position();
-                int count = channel.read(readBuffer);
-                if (count == -1) {
-                    throw new EOFException("Peer disconnected");
-                } else if (count == 0) {
-                    return;
+        try {
+            Object command = wireformat.read();
+            while ( command!=null ) {
+                try {
+                    listener.onTransportCommand(command);
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                    onTransportFailure(new IOException("Transport listener failure."));
                 }
-                readCounter += count;
-            }
-
-            Object command = this.wireformat.unmarshalNB(readBuffer);
-
-            // Sanity checks to make sure the wireformat is behaving as expected.
-            assert wireformat.unmarshalStartPos() <= wireformat.unmarshalEndPos();
-            assert wireformat.unmarshalEndPos() <= readBuffer.position();
-
-            if (command != null) {
-                listener.onTransportCommand(command);
 
                 // the transport may be suspended after processing a command.
                 if (getServiceState() == STOPPED || readSource.isSuspended()) {
                     return;
                 }
-            }
 
+                command = wireformat.read();
+            }
+        } catch (IOException e) {
+            onTransportFailure(e);
         }
     }
 
@@ -508,21 +426,6 @@ public class TcpTransport extends BaseSe
         return remoteAddress;
     }
 
-    /**
-     * @return The number of bytes sent by the transport.
-     */
-    public long getWriteCounter() {
-        return writeCounter;
-    }
-
-    /**
-     * @return The number of bytes received by the transport.
-     */
-    public long getReadCounter() {
-        return readCounter;
-    }
-
-
     public <T> T narrow(Class<T> target) {
         if (target.isAssignableFrom(getClass())) {
             return target.cast(this);
@@ -552,6 +455,11 @@ public class TcpTransport extends BaseSe
     public void resumeRead() {
         if( isConnected() && readSource!=null ) {
             readSource.resume();
+            dispatchQueue.execute(new Runnable(){
+                public void run() {
+                    drainInbound();
+                }
+            });
         }
     }
 
@@ -577,6 +485,10 @@ public class TcpTransport extends BaseSe
 
     public void setWireformat(WireFormat wireformat) {
         this.wireformat = wireformat;
+        if( channel!=null ) {
+            wireformat.setReadableByteChannel(this.channel);
+            wireformat.setWritableByteChannel(this.channel);
+        }
     }
 
     public boolean isConnected() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java?rev=961195&r1=961194&r2=961195&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java Wed Jul  7 04:17:05 2010
@@ -92,16 +92,6 @@ public interface Transport extends Servi
     String getRemoteAddress();
 
     /**
-     * @return The number of bytes sent by the transport.
-     */
-    long getWriteCounter();
-
-    /**
-     * @return The number of bytes received by the transport.
-     */
-    long getReadCounter();
-
-    /**
      * Indicates if the transport can handle faults
      * 
      * @return true if fault tolerant

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=961195&r1=961194&r2=961195&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java Wed Jul  7 04:17:05 2010
@@ -149,14 +149,6 @@ public class TransportFilter implements 
         return next.getRemoteAddress();
     }
 
-    public long getReadCounter() {
-        return next.getReadCounter();
-    }
-
-    public long getWriteCounter() {
-        return next.getWriteCounter();
-    }
-
     /**
      * @return
      * @see org.apache.activemq.transport.Transport#isFaultTolerant()

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java?rev=961195&r1=961194&r2=961195&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java Wed Jul  7 04:17:05 2010
@@ -190,14 +190,14 @@ public class PipeTransport implements Tr
     }
 
     /**
-     * @return The number of bytes sent by the transport.
+     * @return The number of objects sent by the transport.
      */
     public long getWriteCounter() {
         return writeCounter;
     }
 
     /**
-     * @return The number of bytes received by the transport.
+     * @return The number of objects received by the transport.
      */
     public long getReadCounter() {
         return readCounter;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=961195&r1=961194&r2=961195&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java Wed Jul  7 04:17:05 2010
@@ -23,6 +23,8 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -50,117 +52,73 @@ public class MultiWireFormatFactory impl
         ArrayList<WireFormatFactory> wireFormatFactories = new ArrayList<WireFormatFactory>();
         WireFormat wireFormat;
         int maxHeaderLength;
-        int start=0;
-        int end=0;
+        private ReadableByteChannel readableChannel;
+        private ByteBuffer buffer;
 
+        public void setReadableByteChannel(ReadableByteChannel readableChannel) {
+            this.readableChannel = readableChannel;
+            this.buffer = ByteBuffer.allocate(maxHeaderLength);
+        }
 
-        private ByteArrayInputStream peeked;
-
-        public Object unmarshal(DataInput in) throws IOException {
-
-            if (wireFormat == null) {
-                ByteArrayOutputStream baos = new ByteArrayOutputStream(maxHeaderLength);
-                while (wireFormat == null) {
-    
-                    int readByte = ((InputStream) in).read();
-                    if (readByte < 0) {
-                        throw new EOFException();
-                    }
-                    baos.write(readByte);
-    
-                    // Try to discriminate what we have read so far.
-                    for (WireFormatFactory wff : wireFormatFactories) {
-                        if (wff.matchesWireformatHeader(baos.toBuffer())) {
-                            wireFormat = wff.createWireFormat();
-                            break;
-                        }
-                    }
-    
-                    if (baos.size() >= maxHeaderLength && wireFormat==null) {
-                        throw new IOException("Could not discriminate the protocol.");
-                    }
-                }
-                peeked = new ByteArrayInputStream(baos.toBuffer());
-                return wireFormat;
+        public Object read() throws IOException {
+            if( wireFormat!=null ) {
+                throw new IOException("Protocol already discriminated.");
             }
+            readableChannel.read(buffer);
 
-            // If we have some peeked data we need to feed that back..  Only happens
-            // for the first few bytes of the protocol header.
-            if (peeked != null) {
-                if (peeked.available() <= 0) {
-                    peeked = null;
-                } else {
-                    in = new DataInputStream(new ConcatInputStream(peeked, (InputStream) in));
+            Buffer b = new Buffer(buffer.array(), 0, buffer.position());
+            for (WireFormatFactory wff : wireFormatFactories) {
+                if (wff.matchesWireformatHeader( b )) {
+                    wireFormat = wff.createWireFormat();
+                    wireFormat.unread(b);
+                    return wireFormat;
                 }
             }
 
-            Object rc = wireFormat.unmarshal(in);
-            return rc;
-        }
-
-        public int unmarshalStartPos() {
-            if( wireFormat!=null ) {
-                return wireFormat.unmarshalStartPos();
-            } else {
-                return start;
+            if( buffer.position() >= maxHeaderLength ) {
+                throw new IOException("Could not discriminate the protocol.");
             }
+            return null;
         }
 
-        public void unmarshalStartPos(int pos) {
-            if( wireFormat!=null ) {
-                wireFormat.unmarshalStartPos(pos);
-            } else {
-                start=pos;
-            }
+        public void unread(Buffer buffer) {
+            throw new UnsupportedOperationException();
         }
 
-        public int unmarshalEndPos() {
-            if( wireFormat!=null ) {
-                return wireFormat.unmarshalEndPos();
-            } else {
-                return end;
-            }
+        public long getReadCounter() {
+            return buffer.position();
         }
 
-        public void unmarshalEndPos(int pos) {
-            if( wireFormat!=null ) {
-                wireFormat.unmarshalEndPos(pos);
-            } else {
-                end = pos;
-            }
+        public void setWritableByteChannel(WritableByteChannel writableChannel) {
         }
 
-        public Object unmarshalNB(ByteBuffer buffer) throws IOException {
-            if( wireFormat!=null ) {
-                return wireFormat.unmarshalNB(buffer);
-            }
+        public BufferState write(Object value) throws IOException {
+            throw new UnsupportedOperationException();
+        }
 
-            Buffer b = new Buffer(buffer.array(), start, buffer.position());
-            for (WireFormatFactory wff : wireFormatFactories) {
-                if (wff.matchesWireformatHeader( b )) {
-                    wireFormat = wff.createWireFormat();
-                    wireFormat.unmarshalStartPos(start);
-                    wireFormat.unmarshalEndPos(end);
-                    return wireFormat;
-                }
-            }
+        public BufferState flush() throws IOException {
+            throw new UnsupportedOperationException();
+        }
 
-            if( end >= maxHeaderLength ) {
-                throw new IOException("Could not discriminate the protocol.");
-            }
-            return null;
+        public long getWriteCounter() {
+            return 0;
         }
 
+
         public void marshal(Object command, DataOutput out) throws IOException {
-            wireFormat.marshal(command, out);
+            throw new UnsupportedOperationException();
         }
 
         public Buffer marshal(Object command) throws IOException {
-            return wireFormat.marshal(command);
+            throw new UnsupportedOperationException();
         }
 
         public Object unmarshal(Buffer packet) throws IOException {
-            return wireFormat.marshal(packet);
+            throw new UnsupportedOperationException();
+        }
+
+        public Object unmarshal(DataInput in) throws IOException {
+            throw new UnsupportedOperationException();
         }
 
         public ArrayList<WireFormatFactory> getWireFormatFactories() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java?rev=961195&r1=961194&r2=961195&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java Wed Jul  7 04:17:05 2010
@@ -25,6 +25,8 @@ import java.io.InputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
 
 import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
 import org.fusesource.hawtbuf.Buffer;
@@ -71,23 +73,35 @@ public class ObjectStreamWireFormat impl
         }
     }
 
-    public int unmarshalStartPos() {
+    public void setReadableByteChannel(ReadableByteChannel channel) {
         throw new UnsupportedOperationException();
     }
 
-    public void unmarshalStartPos(int pos) {
+    public Object read() throws IOException {
         throw new UnsupportedOperationException();
     }
 
-    public int unmarshalEndPos() {
+    public void unread(Buffer buffer) {
         throw new UnsupportedOperationException();
     }
 
-    public void unmarshalEndPos(int pos) {
+    public long getReadCounter() {
         throw new UnsupportedOperationException();
     }
 
-    public Object unmarshalNB(ByteBuffer buffer) throws IOException {
+    public void setWritableByteChannel(WritableByteChannel channel) {
+        throw new UnsupportedOperationException();
+    }
+
+    public BufferState write(Object value) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    public BufferState flush() throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    public long getWriteCounter() {
         throw new UnsupportedOperationException();
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java?rev=961195&r1=961194&r2=961195&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java Wed Jul  7 04:17:05 2010
@@ -20,6 +20,8 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
 
 import org.fusesource.hawtbuf.Buffer;
 
@@ -32,6 +34,13 @@ import org.fusesource.hawtbuf.Buffer;
  */
 public interface WireFormat {
 
+    enum BufferState {
+        EMPTY,
+        WAS_EMPTY,
+        NOT_EMPTY,
+        FULL,
+    }
+
     /**
      * @return The name of the wireformat
      */
@@ -57,18 +66,67 @@ public interface WireFormat {
      */
     Object unmarshal(DataInput in) throws IOException;
 
-    int unmarshalStartPos();
-    void unmarshalStartPos(int pos);
+    /**
+     * @param channel
+     */
+    public void setReadableByteChannel(ReadableByteChannel channel);
 
-    int unmarshalEndPos();
-    void unmarshalEndPos(int pos);
+    /**
+     * Non-blocking channel based decoding.
+     * 
+     * @return
+     * @throws IOException
+     */
+    Object read() throws IOException;
 
     /**
-     * For a unmarshal session is used for non-blocking
-     * unmarshalling.
+     * Pushes back a buffer as being unread.  The protocol
+     * discriminator may do this before before any reads occur.
+     *
+     * @param buffer
      */
-    Object unmarshalNB(ByteBuffer buffer) throws IOException;
+    void unread(Buffer buffer);
+
+
+    /**
+     * @return The number of bytes received.
+     */
+    public long getReadCounter();
+
+
+    public void setWritableByteChannel(WritableByteChannel channel);
+
+    /**
+     * Non-blocking channel based encoding.
+     *
+     * @return true if the write completed.
+     * @throws IOException
+     */
+    BufferState write(Object value) throws IOException;
+
+    /**
+     * Attempts to complete the previous write which did not complete.
+     * @return
+     * @throws IOException
+     */
+    BufferState flush() throws IOException;
+
+    /**
+     * @return The number of bytes written.
+     */
+    public long getWriteCounter() ;
+
 
+//    void unmarshalStartPos(int pos);
+//
+//    int unmarshalEndPos();
+//    void unmarshalEndPos(int pos);
+//
+//    /**
+//     * For a unmarshal session is used for non-blocking
+//     * unmarshalling.
+//     */
+//    Object unmarshalNB(ByteBuffer buffer) throws IOException;
 
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java?rev=961195&r1=961194&r2=961195&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java Wed Jul  7 04:17:05 2010
@@ -4,6 +4,8 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
 
 import org.fusesource.hawtbuf.Buffer;
 import org.apache.activemq.wireformat.WireFormat;
@@ -33,26 +35,36 @@ public class MockWireFormatFactory imple
 	        throw new UnsupportedOperationException();
 		}
 
-        public int unmarshalStartPos() {
+        public void setReadableByteChannel(ReadableByteChannel channel) {
             throw new UnsupportedOperationException();
         }
 
-        public void unmarshalStartPos(int pos) {
+        public Object read() throws IOException {
             throw new UnsupportedOperationException();
         }
 
-        public int unmarshalEndPos() {
+        public void unread(Buffer buffer) {
             throw new UnsupportedOperationException();
         }
 
-        public void unmarshalEndPos(int pos) {
+        public long getReadCounter() {
             throw new UnsupportedOperationException();
         }
 
-        public Object unmarshalNB(ByteBuffer buffer) throws IOException {
+        public void setWritableByteChannel(WritableByteChannel channel) {
             throw new UnsupportedOperationException();
         }
 
+        public BufferState write(Object value) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+        public BufferState flush() throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        public long getWriteCounter() {
+            throw new UnsupportedOperationException();
+        }
     }
 
 	public WireFormat createWireFormat() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?rev=961195&r1=961194&r2=961195&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala Wed Jul  7 04:17:05 2010
@@ -252,8 +252,11 @@ case class RuntimeResource(parent:Broker
             result.protocol = connection.protocol
             result.transport = connection.transport.getTypeId
             result.remoteAddress = connection.transport.getRemoteAddress
-            result.writeCounter = connection.transport.getWriteCounter
-            result.readCounter = connection.transport.getReadCounter
+            val wf = connection.transport.getWireformat
+            if( wf!=null ) {
+              result.writeCounter = wf.getWriteCounter
+              result.readCounter = wf.getReadCounter
+            }
             cb(Some(result))
           }
       }



Mime
View raw message