Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4BC289979 for ; Mon, 30 Jan 2012 00:45:54 +0000 (UTC) Received: (qmail 39657 invoked by uid 500); 30 Jan 2012 00:45:54 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 39581 invoked by uid 500); 30 Jan 2012 00:45:53 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 39574 invoked by uid 99); 30 Jan 2012 00:45:53 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Jan 2012 00:45:53 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Jan 2012 00:45:51 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id DF384238889B for ; Mon, 30 Jan 2012 00:45:31 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1237514 - in /activemq/activemq-apollo/trunk: apollo-cli/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ Date: Mon, 30 Jan 2012 00:45:31 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120130004531.DF384238889B@eris.apache.org> Author: chirino Date: Mon Jan 30 00:45:31 2012 New Revision: 1237514 URL: http://svn.apache.org/viewvc?rev=1237514&view=rev Log: Allow openwire messages to get persisted. Modified: activemq/activemq-apollo/trunk/apollo-cli/pom.xml activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java Modified: activemq/activemq-apollo/trunk/apollo-cli/pom.xml URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/pom.xml?rev=1237514&r1=1237513&r2=1237514&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-cli/pom.xml (original) +++ activemq/activemq-apollo/trunk/apollo-cli/pom.xml Mon Jan 30 00:45:31 2012 @@ -135,6 +135,12 @@ 1.1-SNAPSHOT test + + org.apache.activemq + apollo-openwire + 1.1-SNAPSHOT + test + org.apache.activemq Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala?rev=1237514&r1=1237513&r2=1237514&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala (original) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala Mon Jan 30 00:45:31 2012 @@ -23,10 +23,13 @@ import OpenwireConstants._ import java.nio.ByteBuffer import java.nio.channels.{SocketChannel, WritableByteChannel, ReadableByteChannel} import java.io.EOFException -import org.fusesource.hawtbuf.{BufferEditor, DataByteArrayOutputStream, Buffer} import org.apache.activemq.apollo.broker.{Sizer, Message} import org.apache.activemq.apollo.openwire.codec.OpenWireFormat import org.apache.activemq.apollo.openwire.command._ +import org.apache.activemq.apollo.broker.BufferConversions._ +import org.fusesource.hawtbuf.{DataByteArrayInputStream, DataByteArrayOutputStream, AbstractVarIntSupport, Buffer} + +case class CachedEncoding(tight:Boolean, version:Int, buffer:Buffer) /** *

@@ -35,11 +38,64 @@ import org.apache.activemq.apollo.openwi * @author Hiram Chirino */ object OpenwireCodec extends Sizer[Command] { + + final val DB_VERSION = OpenWireFormat.DEFAULT_VERSION + final val DB_TIGHT_ENCODING = true + def encode(message: Message):MessageRecord = { - throw new UnsupportedOperationException + val rc = new MessageRecord + rc.protocol = PROTOCOL + rc.expiration = message.expiration + + val msg = message.asInstanceOf[OpenwireMessage]; + rc.buffer = msg.message.getCachedEncoding match { + case CachedEncoding(tight, version, buffer) => + + val boas = new DataByteArrayOutputStream( + 1 + + AbstractVarIntSupport.computeVarIntSize(version)+ + buffer.length() + ) + + boas.writeBoolean(tight) + boas.writeVarInt(version) + boas.write(buffer) + boas.toBuffer + + case _ => + + val db_format = new OpenWireFormat(); + db_format.setCacheEnabled(false) + db_format.setTightEncodingEnabled(DB_TIGHT_ENCODING) + db_format.setVersion(DB_VERSION) + + val size = msg.message.getEncodedSize + val boas = new DataByteArrayOutputStream(if(size==0) 1024 else size + 20) + boas.writeBoolean(DB_TIGHT_ENCODING) + boas.writeVarInt(DB_VERSION) + db_format.marshal(msg.message, boas); + boas.toBuffer + + } + rc } + def decode(message: MessageRecord) = { - throw new UnsupportedOperationException + val buffer = message.buffer.buffer(); + val bais = new DataByteArrayInputStream(message.buffer) + var tight: Boolean = bais.readBoolean() + var version: Int = bais.readVarInt() + buffer.moveHead(bais.getPos-buffer.offset) + + val db_format = new OpenWireFormat(); + db_format.setCacheEnabled(false) + db_format.setTightEncodingEnabled(tight) + db_format.setVersion(version) + + val msg = db_format.unmarshal(bais).asInstanceOf[ActiveMQMessage] + msg.setEncodedSize(buffer.length) + msg.setCachedEncoding(CachedEncoding(tight, version, buffer)) + new OpenwireMessage(msg) } def size(value: Command) = { @@ -63,8 +119,7 @@ class OpenwireCodec extends ProtocolCode var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size) var write_buffer = ByteBuffer.allocate(0) - val format = new OpenWireFormat(); - + val format = new OpenWireFormat(1); def full = next_write_buffer.size() >= (write_buffer_size >> 1) def is_empty = write_buffer.remaining() == 0 @@ -84,8 +139,21 @@ class OpenwireCodec extends ProtocolCode } else { val was_empty = is_empty command match { - case frame:Command=> - format.marshal(frame, next_write_buffer) + case command:ActiveMQMessage=> + command.getCachedEncoding match { + case CachedEncoding(tight, version, buffer) => + // We might be able to re-use the origin format of the message. + if( !format.isCacheEnabled && format.isTightEncodingEnabled==tight && format.getVersion==version ) { + next_write_buffer.write(buffer) + } else { + format.marshal(command, next_write_buffer) + } + case _ => + format.marshal(command, next_write_buffer) + } + + case command:Command=> + format.marshal(command, next_write_buffer) } if( was_empty ) { ProtocolCodec.BufferState.WAS_EMPTY @@ -199,7 +267,19 @@ class OpenwireCodec extends ProtocolCode read_waiting_on += 4 next_action = read_header - rc.asInstanceOf[Command] + var command: Command = rc.asInstanceOf[Command] + + // If value caching is NOT enabled, then we potentially re-use the encode + // value of the message. + command match { + case message:ActiveMQMessage => + message.setEncodedSize(size) + if( !format.isCacheEnabled ) { + message.setCachedEncoding(CachedEncoding(format.isTightEncodingEnabled, format.getVersion, buf)) + } + case _ => + } + command } def getLastWriteSize = 0 Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1237514&r1=1237513&r2=1237514&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Mon Jan 30 00:45:31 2012 @@ -50,7 +50,7 @@ object OpenwireProtocolHandler extends L val preferred_wireformat_settings = new WireFormatInfo(); preferred_wireformat_settings.setVersion(OpenWireFormat.DEFAULT_VERSION); preferred_wireformat_settings.setStackTraceEnabled(true); - preferred_wireformat_settings.setCacheEnabled(true); + preferred_wireformat_settings.setCacheEnabled(false); preferred_wireformat_settings.setTcpNoDelayEnabled(true); preferred_wireformat_settings.setTightEncodingEnabled(true); preferred_wireformat_settings.setSizePrefixDisabled(false); @@ -65,6 +65,7 @@ object OpenwireProtocolHandler extends L */ class OpenwireProtocolHandler extends ProtocolHandler { + var connection_log:Log = OpenwireProtocolHandler var minimum_protocol_version = 1 import OpenwireProtocolHandler._ @@ -172,14 +173,14 @@ class OpenwireProtocolHandler extends Pr override def on_transport_failure(error: IOException) = { if (!connection.stopped) { - error.printStackTrace suspend_read("shutdown") - debug(error, "Shutting connection down due to: %s", error) + connection_log.info(error, "Shutting connection '%s' down due to: %s", security_context.remote_address, error) connection.stop } } override def on_transport_connected():Unit = { + connection_log = connection.connector.broker.connection_log security_context.local_address = connection.transport.getLocalAddress security_context.remote_address = connection.transport.getRemoteAddress @@ -197,6 +198,7 @@ class OpenwireProtocolHandler extends Pr reset { suspend_read("virtual host lookup") this.host = broker.get_default_virtual_host + connection_log = this.host.connection_log resume_read if(host==null) { async_die("Could not find default virtual host") @@ -354,7 +356,8 @@ class OpenwireProtocolHandler extends Pr def die[T](msg: String, actual:Command=null):T = { if (!dead) { dead = true - debug("Shutting connection down due to: " + msg) + + connection_log.info("OpenWire connection '%s' error: %s", security_context.remote_address, msg) // TODO: if there are too many open connections we should just close the connection // without waiting for the error to get sent to the client. queue.after(die_delay, TimeUnit.MILLISECONDS) { @@ -642,7 +645,13 @@ class OpenwireProtocolHandler extends Pr // We may need to add some headers.. val delivery = new Delivery delivery.message = new OpenwireMessage(message) - delivery.size = message.getSize + delivery.size = { + val rc = message.getEncodedSize + if( rc != 0 ) + rc + else + message.getSize + } delivery.uow = uow if( message.isResponseRequired ) { Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java?rev=1237514&r1=1237513&r2=1237514&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java (original) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java Mon Jan 30 00:45:31 2012 @@ -89,6 +89,22 @@ public abstract class Message extends Ba public abstract org.apache.activemq.apollo.openwire.command.Message copy(); public abstract void clearBody() throws OpenwireException; + protected int encodedSize; + public int getEncodedSize() { + return encodedSize; + } + public void setEncodedSize(int encodedSize) { + this.encodedSize = encodedSize; + } + + protected Object cachedEncoding; + public Object getCachedEncoding() { + return cachedEncoding; + } + public void setCachedEncoding(Object cachedEncoding) { + this.cachedEncoding = cachedEncoding; + } + // useful to reduce the memory footprint of a persisted message public void clearMarshalledState() { properties = null;