activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1237514 - in /activemq/activemq-apollo/trunk: apollo-cli/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/
Date Mon, 30 Jan 2012 00:45:31 GMT
Author: chirino
Date: Mon Jan 30 00:45:31 2012
New Revision: 1237514

URL: http://svn.apache.org/viewvc?rev=1237514&view=rev
Log:
Allow openwire messages to get persisted.

Modified:
    activemq/activemq-apollo/trunk/apollo-cli/pom.xml
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java

Modified: activemq/activemq-apollo/trunk/apollo-cli/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/pom.xml?rev=1237514&r1=1237513&r2=1237514&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-cli/pom.xml Mon Jan 30 00:45:31 2012
@@ -135,6 +135,12 @@
       <version>1.1-SNAPSHOT</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-openwire</artifactId>
+      <version>1.1-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.activemq</groupId>

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala?rev=1237514&r1=1237513&r2=1237514&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
Mon Jan 30 00:45:31 2012
@@ -23,10 +23,13 @@ import OpenwireConstants._
 import java.nio.ByteBuffer
 import java.nio.channels.{SocketChannel, WritableByteChannel, ReadableByteChannel}
 import java.io.EOFException
-import org.fusesource.hawtbuf.{BufferEditor, DataByteArrayOutputStream, Buffer}
 import org.apache.activemq.apollo.broker.{Sizer, Message}
 import org.apache.activemq.apollo.openwire.codec.OpenWireFormat
 import org.apache.activemq.apollo.openwire.command._
+import org.apache.activemq.apollo.broker.BufferConversions._
+import org.fusesource.hawtbuf.{DataByteArrayInputStream, DataByteArrayOutputStream, AbstractVarIntSupport,
Buffer}
+
+case class CachedEncoding(tight:Boolean, version:Int, buffer:Buffer) 
 
 /**
  * <p>
@@ -35,11 +38,64 @@ import org.apache.activemq.apollo.openwi
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 object OpenwireCodec extends Sizer[Command] {
+
+  final val DB_VERSION = OpenWireFormat.DEFAULT_VERSION
+  final val DB_TIGHT_ENCODING = true
+
   def encode(message: Message):MessageRecord = {
-    throw new UnsupportedOperationException
+    val rc = new MessageRecord
+    rc.protocol = PROTOCOL
+    rc.expiration = message.expiration
+
+    val msg = message.asInstanceOf[OpenwireMessage];
+    rc.buffer = msg.message.getCachedEncoding match {
+      case CachedEncoding(tight, version, buffer) =>
+
+        val boas = new DataByteArrayOutputStream(
+          1 +
+          AbstractVarIntSupport.computeVarIntSize(version)+
+          buffer.length()
+        )
+
+        boas.writeBoolean(tight)
+        boas.writeVarInt(version)
+        boas.write(buffer)
+        boas.toBuffer
+
+      case _ =>
+
+        val db_format = new OpenWireFormat();
+        db_format.setCacheEnabled(false)
+        db_format.setTightEncodingEnabled(DB_TIGHT_ENCODING)
+        db_format.setVersion(DB_VERSION)
+
+        val size = msg.message.getEncodedSize
+        val boas = new DataByteArrayOutputStream(if(size==0) 1024 else size + 20)
+        boas.writeBoolean(DB_TIGHT_ENCODING)
+        boas.writeVarInt(DB_VERSION)
+        db_format.marshal(msg.message, boas);
+        boas.toBuffer
+
+    }
+    rc
   }
+  
   def decode(message: MessageRecord) = {
-    throw new UnsupportedOperationException
+    val buffer = message.buffer.buffer();
+    val bais = new DataByteArrayInputStream(message.buffer)
+    var tight: Boolean = bais.readBoolean()
+    var version: Int = bais.readVarInt()
+    buffer.moveHead(bais.getPos-buffer.offset)
+
+    val db_format = new OpenWireFormat();
+    db_format.setCacheEnabled(false)
+    db_format.setTightEncodingEnabled(tight)
+    db_format.setVersion(version)
+
+    val msg = db_format.unmarshal(bais).asInstanceOf[ActiveMQMessage]
+    msg.setEncodedSize(buffer.length)
+    msg.setCachedEncoding(CachedEncoding(tight, version, buffer))
+    new OpenwireMessage(msg)
   }
 
   def size(value: Command) = {
@@ -63,8 +119,7 @@ class OpenwireCodec extends ProtocolCode
   var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size)
   var write_buffer = ByteBuffer.allocate(0)
 
-  val format = new OpenWireFormat();
-
+  val format = new OpenWireFormat(1);
 
   def full = next_write_buffer.size() >= (write_buffer_size >> 1)
   def is_empty = write_buffer.remaining() == 0
@@ -84,8 +139,21 @@ class OpenwireCodec extends ProtocolCode
     } else {
       val was_empty = is_empty
       command match {
-        case frame:Command=>
-          format.marshal(frame, next_write_buffer)
+        case command:ActiveMQMessage=>
+          command.getCachedEncoding match {
+            case CachedEncoding(tight, version, buffer) =>
+              // We might be able to re-use the origin format of the message.
+              if( !format.isCacheEnabled && format.isTightEncodingEnabled==tight
&& format.getVersion==version ) {
+                next_write_buffer.write(buffer)
+              } else {
+                format.marshal(command, next_write_buffer)
+              }
+            case _ =>
+              format.marshal(command, next_write_buffer)
+          }
+          
+        case command:Command=>
+          format.marshal(command, next_write_buffer)
       }
       if( was_empty ) {
         ProtocolCodec.BufferState.WAS_EMPTY
@@ -199,7 +267,19 @@ class OpenwireCodec extends ProtocolCode
 
     read_waiting_on += 4
     next_action = read_header
-    rc.asInstanceOf[Command]
+    var command: Command = rc.asInstanceOf[Command]
+    
+    // If value caching is NOT enabled, then we potentially re-use the encode
+    // value of the message.
+    command match {
+      case message:ActiveMQMessage =>
+        message.setEncodedSize(size)
+        if( !format.isCacheEnabled ) {
+          message.setCachedEncoding(CachedEncoding(format.isTightEncodingEnabled, format.getVersion,
buf))
+        }
+      case _ =>
+    }
+    command
   }
 
   def getLastWriteSize = 0

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1237514&r1=1237513&r2=1237514&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Mon Jan 30 00:45:31 2012
@@ -50,7 +50,7 @@ object OpenwireProtocolHandler extends L
   val preferred_wireformat_settings = new WireFormatInfo();
   preferred_wireformat_settings.setVersion(OpenWireFormat.DEFAULT_VERSION);
   preferred_wireformat_settings.setStackTraceEnabled(true);
-  preferred_wireformat_settings.setCacheEnabled(true);
+  preferred_wireformat_settings.setCacheEnabled(false);
   preferred_wireformat_settings.setTcpNoDelayEnabled(true);
   preferred_wireformat_settings.setTightEncodingEnabled(true);
   preferred_wireformat_settings.setSizePrefixDisabled(false);
@@ -65,6 +65,7 @@ object OpenwireProtocolHandler extends L
  */
 class OpenwireProtocolHandler extends ProtocolHandler {
 
+  var connection_log:Log = OpenwireProtocolHandler
   var minimum_protocol_version = 1
 
   import OpenwireProtocolHandler._
@@ -172,14 +173,14 @@ class OpenwireProtocolHandler extends Pr
 
   override def on_transport_failure(error: IOException) = {
     if (!connection.stopped) {
-      error.printStackTrace
       suspend_read("shutdown")
-      debug(error, "Shutting connection down due to: %s", error)
+      connection_log.info(error, "Shutting connection '%s'  down due to: %s", security_context.remote_address,
error)
       connection.stop
     }
   }
 
   override def on_transport_connected():Unit = {
+    connection_log = connection.connector.broker.connection_log
     security_context.local_address = connection.transport.getLocalAddress
     security_context.remote_address = connection.transport.getRemoteAddress
 
@@ -197,6 +198,7 @@ class OpenwireProtocolHandler extends Pr
     reset {
       suspend_read("virtual host lookup")
       this.host = broker.get_default_virtual_host
+      connection_log = this.host.connection_log
       resume_read
       if(host==null) {
         async_die("Could not find default virtual host")
@@ -354,7 +356,8 @@ class OpenwireProtocolHandler extends Pr
   def die[T](msg: String, actual:Command=null):T = {
     if (!dead) {
       dead = true
-      debug("Shutting connection down due to: " + msg)
+
+      connection_log.info("OpenWire connection '%s' error: %s", security_context.remote_address,
msg)
       // TODO: if there are too many open connections we should just close the connection
       // without waiting for the error to get sent to the client.
       queue.after(die_delay, TimeUnit.MILLISECONDS) {
@@ -642,7 +645,13 @@ class OpenwireProtocolHandler extends Pr
       // We may need to add some headers..
       val delivery = new Delivery
       delivery.message = new OpenwireMessage(message)
-      delivery.size = message.getSize
+      delivery.size = {
+        val rc = message.getEncodedSize
+        if( rc != 0 )
+          rc
+        else
+          message.getSize
+      }
       delivery.uow = uow
 
       if( message.isResponseRequired ) {

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java?rev=1237514&r1=1237513&r2=1237514&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java
Mon Jan 30 00:45:31 2012
@@ -89,6 +89,22 @@ public abstract class Message extends Ba
     public abstract org.apache.activemq.apollo.openwire.command.Message copy();
     public abstract void clearBody() throws OpenwireException;
 
+    protected int encodedSize;
+    public int getEncodedSize() {
+        return encodedSize;
+    }
+    public void setEncodedSize(int encodedSize) {
+        this.encodedSize = encodedSize;
+    }
+
+    protected Object cachedEncoding;
+    public Object getCachedEncoding() {
+        return cachedEncoding;
+    }
+    public void setCachedEncoding(Object cachedEncoding) {
+        this.cachedEncoding = cachedEncoding;
+    }
+
     // useful to reduce the memory footprint of a persisted message
     public void clearMarshalledState() {
         properties = null;



Mime
View raw message