Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 21997 invoked from network); 13 Mar 2009 09:04:53 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 13 Mar 2009 09:04:53 -0000 Received: (qmail 19447 invoked by uid 500); 13 Mar 2009 09:04:53 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 19425 invoked by uid 500); 13 Mar 2009 09:04:53 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 19416 invoked by uid 99); 13 Mar 2009 09:04:53 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Mar 2009 02:04:53 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Mar 2009 09:04:45 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5DE2323888CA; Fri, 13 Mar 2009 09:04:25 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r753178 - in /activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze: ./ group/ impl/reliable/simple/ jms/ jms/message/ Date: Fri, 13 Mar 2009 09:04:24 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090313090425.5DE2323888CA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Fri Mar 13 09:04:23 2009 New Revision: 753178 URL: http://svn.apache.org/viewvc?rev=753178&view=rev Log: remove depreciated code Removed: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleFlow.java Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=753178&r1=753177&r2=753178&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java Fri Mar 13 09:04:23 2009 @@ -19,7 +19,6 @@ import java.net.URI; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; - import org.apache.activeblaze.impl.network.Network; import org.apache.activeblaze.impl.network.NetworkFactory; import org.apache.activeblaze.impl.processor.ChainedProcessor; @@ -38,6 +37,7 @@ import org.apache.activeblaze.wire.PacketData.PacketDataBuffer; import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.protobuf.MessageBuffer; +import org.apache.activemq.protobuf.UTF8Buffer; /** *

* A BlazeChannel handles all client communication, either unicast, @@ -185,14 +185,7 @@ } public synchronized void broadcast(Destination destination, BlazeMessage msg) throws Exception { - msg.setDestination(destination); - msg.storeContent(); - BlazeDataBuffer blazeData = msg.getContent().freeze(); - PacketDataBean packetData = getPacketData(MessageType.BLAZE_DATA, blazeData); - packetData.setReliable(true); - packetData.setDestinationData(destination.getData()); - packetData.setPayloadType(msg.getType()); - Packet packet = new Packet(packetData.freeze()); + Packet packet = buildPacket(destination, msg); this.broadcast.downStream(packet); } @@ -229,19 +222,19 @@ public void setConfiguration(BlazeConfiguration configuration) { this.configuration = configuration; } - /** * @return the blazeMessageProcessor */ - public BlazeMessageProcessor getBlazeMessageProcessor(){ + public BlazeMessageProcessor getBlazeMessageProcessor() { return this.blazeMessageProcessor; } /** - * @param blazeMessageProcessor the blazeMessageProcessor to set + * @param blazeMessageProcessor + * the blazeMessageProcessor to set */ - public void setBlazeMessageProcessor(BlazeMessageProcessor blazeMessageProcessor){ + public void setBlazeMessageProcessor(BlazeMessageProcessor blazeMessageProcessor) { this.blazeMessageProcessor = blazeMessageProcessor; } @@ -261,29 +254,28 @@ BlazeMessage result = null; if (this.blazeMessageProcessor != null) { result = this.blazeMessageProcessor.processBlazeMessage(data); - }else { - - if (data != null) { - DestinationData destination = data.getDestinationData(); - Buffer payload = data.getPayload(); - BlazeDataBuffer blazeData = BlazeDataBuffer.parseUnframed(payload); - String fromId = null; - if (data.hasProducerId()) { - fromId = data.getProducerId().toStringUtf8(); - } - result = createMessage(fromId); - result.setDestination(destination); - result.setFromId(fromId); - if (data.hasMessageId()) { - result.setMessageId(data.getMessageId().toStringUtf8()); - } - if (data.hasCorrelationId()) { - result.setCorrelationId(data.getCorrelationId().toStringUtf8()); + } else { + if (data != null) { + DestinationData destination = data.getDestinationData(); + Buffer payload = data.getPayload(); + BlazeDataBuffer blazeData = BlazeDataBuffer.parseUnframed(payload); + String fromId = null; + if (data.hasProducerId()) { + fromId = data.getProducerId().toStringUtf8(); + } + result = createMessage(fromId); + result.setDestination(destination); + result.setFromId(fromId); + if (data.hasMessageId()) { + result.setMessageId(data.getMessageId().toStringUtf8()); + } + if (data.hasCorrelationId()) { + result.setCorrelationId(data.getCorrelationId().toStringUtf8()); + } + result.setTimeStamp(blazeData.getTimestamp()); + result.setType(data.getPayloadType()); + result.setContent(blazeData); } - result.setTimeStamp(blazeData.getTimestamp()); - result.setType(data.getPayloadType()); - result.setContent(blazeData); - } } return result; } @@ -306,4 +298,33 @@ } } } + + protected final Packet buildPacket(Destination destination, BlazeMessage message) { + return buildPacket(destination, message, false,null); + } + + protected final Packet buildPacket(Destination destination, BlazeMessage message,String correlationId) { + return buildPacket(destination, message, false,correlationId); + } + + protected final Packet buildPacket(Destination destination, BlazeMessage message,boolean responseRequired) { + return buildPacket(destination, message, responseRequired,null); + } + + protected final Packet buildPacket(Destination destination, BlazeMessage message, boolean responseRequired,String correlationId) { + message.setDestination(destination); + message.storeContent(); + + BlazeDataBuffer blazeData = message.getContent().freeze(); + PacketDataBean packetData = getPacketData(MessageType.BLAZE_DATA, blazeData); + packetData.setReliable(true); + packetData.setResponseRequired(responseRequired); + if (correlationId != null && correlationId.length() > 0){ + packetData.setCorrelationId(new UTF8Buffer(correlationId)); + } + packetData.setDestinationData(destination.getData()); + packetData.setPayloadType(message.getType()); + Packet packet = new Packet(packetData.freeze()); + return packet; + } } Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java?rev=753178&r1=753177&r2=753178&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java Fri Mar 13 09:04:23 2009 @@ -23,7 +23,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; - import org.apache.activeblaze.util.IOUtils; import org.apache.activeblaze.wire.BlazeData; import org.apache.activeblaze.wire.BoolType; @@ -53,26 +52,33 @@ import org.apache.activeblaze.wire.ShortType.ShortTypeBean; import org.apache.activeblaze.wire.StringType.StringTypeBean; import org.apache.activemq.protobuf.Buffer; - +import org.apache.activemq.protobuf.UTF8Buffer; /** - * A BlazeMessage object is used to send a set of name-value pairs. The names are String - * objects, and the values are primitive data types in the Java programming language. The names must have a value that - * is not null, and not an empty string. The entries can be accessed sequentially or randomly by name. The order of the - * entries is undefined. BlazeMessage inherits from the Message interface and adds a - * message body that contains a Map. + * A BlazeMessage object is used to send a set of name-value pairs. + * The names are String objects, and the values are primitive data + * types in the Java programming language. The names must have a value that is + * not null, and not an empty string. The entries can be accessed sequentially + * or randomly by name. The order of the entries is undefined. + * BlazeMessage inherits from the Message interface + * and adds a message body that contains a Map. *

- * The primitive types can be read or written explicitly using methods for each type. They may also be read or written - * generically as objects. For instance, a call to BlazeMessage.setInt("foo", 6) is equivalent to - * BlazeMessage.setObject("foo", new Integer(6)). Both forms are provided, because the explicit form is - * convenient for static programming, and the object form is needed when types are not known at compile time. + * The primitive types can be read or written explicitly using methods for each + * type. They may also be read or written generically as objects. For instance, + * a call to BlazeMessage.setInt("foo", 6) is equivalent to + * BlazeMessage.setObject("foo", new Integer(6)). Both forms are + * provided, because the explicit form is convenient for static programming, and + * the object form is needed when types are not known at compile time. *

*

- * BlazeMessage objects support the following conversion table. The marked cases must be supported. The - * unmarked cases must throw a JMSException. The String -to-primitive conversions may - * throw a runtime exception if the primitive's valueOf() method does not accept it as a valid - * String representation of the primitive. + * BlazeMessage objects support the following conversion table. The + * marked cases must be supported. The unmarked cases must throw a + * JMSException. The String -to-primitive conversions + * may throw a runtime exception if the primitive's valueOf() + * method does not accept it as a valid String representation of + * the primitive. *

- * A value written as the row type can be read as the column type.

+ * A value written as the row type can be read as the column type. + *

* *

  * | | boolean byte short char int long float double String byte[] |----------------------------------------------------------------------
@@ -83,13 +89,14 @@
  * 
  * 

*

- * Attempting to read a null value as a primitive type must be treated as calling the primitive's corresponding - * valueOf(String) conversion method with a null value. Since char does not support a - * String conversion, attempting to read a null value as a char must throw a - * NullPointerException. + * Attempting to read a null value as a primitive type must be treated as + * calling the primitive's corresponding valueOf(String) conversion + * method with a null value. Since char does not support a + * String conversion, attempting to read a null value as a + * char must throw a NullPointerException. * */ -public class BlazeMessage implements Map{ +public class BlazeMessage implements Map { private static final String DEFAULT_TEXT_PAYLOAD = "DEFAULT_TEXT_PAYLOAD"; private static final String DEFAULT_BYTES_PAYLOAD = "DEFAULT_BYTES_PAYLOAD"; private static final String DEFAULT_OBJECT_PAYLOAD = "DEFAULT_OBJECT_PAYLOAD"; @@ -107,22 +114,24 @@ private transient boolean persistent; private transient int type; private BlazeData content; - + private transient boolean loaded; + /** * Default Constructor */ public BlazeMessage() { } - + /** - * Constructor - Utility to construct a message with a text String payload + * Constructor - Utility to construct a message with a text + * String payload * * @param text */ public BlazeMessage(String text) { setStringValue(DEFAULT_TEXT_PAYLOAD, text); } - + /** * Constructor - Utility to construct a message with a byte[] array payload * @@ -131,7 +140,7 @@ public BlazeMessage(byte[] data) { setBytesValue(DEFAULT_BYTES_PAYLOAD, data); } - + /** * Constructor - Utility to construct a message with an object payload * @@ -140,269 +149,282 @@ public BlazeMessage(Object data) { setObject(data); } - + /** * Utility method for setting a default String payload * * @param text */ - public void setText(String text){ + public void setText(String text) { setStringValue(DEFAULT_TEXT_PAYLOAD, text); } - + /** - * Utility method used for when a BlazeMessage is only carrying a byte[] array + * Utility method used for when a BlazeMessage is only carrying a byte[] + * array * * @return text the default text * @throws Exception */ - public String getText() throws Exception{ + public String getText() throws Exception { return getStringValue(DEFAULT_TEXT_PAYLOAD); } - + /** * Utility method for setting a default byte[] payload * * @param payload */ - public void setBytes(byte[] payload){ + public void setBytes(byte[] payload) { setBytesValue(DEFAULT_BYTES_PAYLOAD, payload); } - + /** * Utility method used for when a BlazeMessage is only carrying an Object * * @return text the default text * @throws Exception */ - public Object getObject() throws Exception{ + public Object getObject() throws Exception { Buffer buffer = getBufferValue(DEFAULT_OBJECT_PAYLOAD); return IOUtils.getObject(buffer); } - + /** * Utility method for setting a default Object payload * * @param payload */ - public void setObject(Object payload){ + public void setObject(Object payload) { try { put(DEFAULT_OBJECT_PAYLOAD, IOUtils.getBuffer(payload)); } catch (Exception e) { throw new BlazeRuntimeException(e); } } - + /** * Utility method used for when a BlazeMessage is only carrying a String * * @return text the default text * @throws Exception */ - public byte[] getBytes() throws Exception{ + public byte[] getBytes() throws Exception { return getBytesValue(DEFAULT_BYTES_PAYLOAD); } - + /** * @return the destination */ - public Destination getDestination(){ + public Destination getDestination() { initializeReading(); return this.destination; } - + /** - * @param destination the destination to set + * @param destination + * the destination to set */ - public void setDestination(Destination destination){ + public void setDestination(Destination destination) { this.destination = destination; } - + /** * @param destination */ - public void setDestination(DestinationData destinationData){ + public void setDestination(DestinationData destinationData) { if (destinationData != null) { this.destination = new Destination(destinationData); } } - + /** * The id of the channel that sent the message * * @return the fromId */ - public String getFromId(){ + public String getFromId() { initializeReading(); return this.fromId; } - + /** - * @param fromId the fromId to set + * @param fromId + * the fromId to set */ - public void setFromId(String fromId){ + public void setFromId(String fromId) { this.fromId = fromId; } - + /** * @return the messageId */ - public String getMessageId(){ + public String getMessageId() { initializeReading(); return this.messageId; } - + /** - * @param messageId the messageId to set + * @param messageId + * the messageId to set */ - public void setMessageId(String messageId){ + public void setMessageId(String messageId) { this.messageId = messageId; } - + /** * @return the correlationId */ - public String getCorrelationId(){ + public String getCorrelationId() { initializeReading(); return this.correlationId; } - + /** - * @param correlationId the correlationId to set + * @param correlationId + * the correlationId to set */ - public void setCorrelationId(String correlationId){ + public void setCorrelationId(String correlationId) { this.correlationId = correlationId; } - + /** * @return the timeStamp */ - public long getTimeStamp(){ + public long getTimeStamp() { initializeReading(); return this.timeStamp; } - + /** - * @param timeStamp the timeStamp to set + * @param timeStamp + * the timeStamp to set */ - public void setTimeStamp(long timeStamp){ + public void setTimeStamp(long timeStamp) { this.timeStamp = timeStamp; } - + /** * @return the replyTo */ - public Destination getReplyTo(){ + public Destination getReplyTo() { initializeReading(); return this.replyTo; } - + /** - * @param replyTo the replyTo to set + * @param replyTo + * the replyTo to set */ - public void setReplyTo(Destination replyTo){ + public void setReplyTo(Destination replyTo) { this.replyTo = replyTo; } - + /** - * @param replyTo the replyTo to set + * @param replyTo + * the replyTo to set */ - public void setReplyTo(DestinationData replyTo){ + public void setReplyTo(DestinationData replyTo) { this.replyTo = new Destination(replyTo); } - + /** * @return the expiration */ - public long getExpiration(){ + public long getExpiration() { initializeReading(); return this.expiration; } - + /** - * @param expiration the expiration to set + * @param expiration + * the expiration to set */ - public void setExpiration(long expiration){ + public void setExpiration(long expiration) { this.expiration = expiration; } - + /** * @return the redeliveryCounter */ - public int getRedeliveryCounter(){ + public int getRedeliveryCounter() { initializeReading(); return this.redeliveryCounter; } - + /** - * @param redeliveryCounter the redeliveryCounter to set + * @param redeliveryCounter + * the redeliveryCounter to set */ - public void setRedeliveryCounter(int redeliveryCounter){ + public void setRedeliveryCounter(int redeliveryCounter) { this.redeliveryCounter = redeliveryCounter; } - + /** * @return the priority */ - public int getPriority(){ + public int getPriority() { initializeReading(); return this.priority; } - + /** - * @param priority the priority to set + * @param priority + * the priority to set */ - public void setPriority(int priority){ + public void setPriority(int priority) { this.priority = priority; } - + /** * @return the persistent */ - public boolean isPersistent(){ + public boolean isPersistent() { initializeReading(); return this.persistent; } - + /** - * @param persistent the persistent to set + * @param persistent + * the persistent to set */ - public void setPersistent(boolean persistent){ + public void setPersistent(boolean persistent) { this.persistent = persistent; } - + /** * @return the type */ - public String getMessageType(){ + public String getMessageType() { initializeReading(); return this.messageType; } - + /** - * @param type the type to set + * @param type + * the type to set */ - public void setMessageType(String type){ + public void setMessageType(String type) { this.messageType = type; } - + /** * Get the type * * @return the type */ - public int getType(){ + public int getType() { return this.type; } - - public void setType(int type){ + + public void setType(int type) { this.type = type; } - + /** * @return a copy of this message */ - public BlazeMessage clone(){ + public BlazeMessage clone() { BlazeMessage copy = new BlazeMessage(); try { copy(copy); @@ -411,22 +433,24 @@ } return copy; } - + /** * clear the contents of this message */ - public void clear(){ + public void clear() { this.map.clear(); } - + /** * Returns the boolean value with the specified name. * - * @param name the name of the boolean + * @param name + * the name of the boolean * @return the boolean value with the specified name - * @throws BlazeMessageFormatException if this type conversion is invalid. + * @throws BlazeMessageFormatException + * if this type conversion is invalid. */ - public boolean getBooleanValue(String name) throws BlazeMessageFormatException{ + public boolean getBooleanValue(String name) throws BlazeMessageFormatException { initializeReading(); Object value = this.map.get(name); if (value == null) { @@ -440,15 +464,17 @@ } throw new BlazeMessageFormatException(" cannot read a boolean from " + value.getClass().getName()); } - + /** * Returns the byte value with the specified name. * - * @param name the name of the byte + * @param name + * the name of the byte * @return the byte value with the specified name - * @throws BlazeMessageFormatException if this type conversion is invalid. + * @throws BlazeMessageFormatException + * if this type conversion is invalid. */ - public byte getByteValue(String name) throws BlazeMessageFormatException{ + public byte getByteValue(String name) throws BlazeMessageFormatException { initializeReading(); Object value = this.map.get(name); if (value == null) { @@ -462,15 +488,17 @@ } throw new BlazeMessageFormatException(" cannot read a byte from " + value.getClass().getName()); } - + /** * Returns the short value with the specified name. * - * @param name the name of the short + * @param name + * the name of the short * @return the short value with the specified name - * @throws BlazeMessageFormatException if this type conversion is invalid. + * @throws BlazeMessageFormatException + * if this type conversion is invalid. */ - public short getShortValue(String name) throws BlazeMessageFormatException{ + public short getShortValue(String name) throws BlazeMessageFormatException { initializeReading(); Object value = this.map.get(name); if (value == null) { @@ -487,15 +515,17 @@ } throw new BlazeMessageFormatException(" cannot read a short from " + value.getClass().getName()); } - + /** * Returns the Unicode character value with the specified name. * - * @param name the name of the Unicode character + * @param name + * the name of the Unicode character * @return the Unicode character value with the specified name - * @throws BlazeMessageFormatException if this type conversion is invalid. + * @throws BlazeMessageFormatException + * if this type conversion is invalid. */ - public char getCharValue(String name) throws BlazeMessageFormatException{ + public char getCharValue(String name) throws BlazeMessageFormatException { initializeReading(); Object value = this.map.get(name); if (value == null) { @@ -506,15 +536,17 @@ } throw new BlazeMessageFormatException(" cannot read a short from " + value.getClass().getName()); } - + /** * Returns the int value with the specified name. * - * @param name the name of the int + * @param name + * the name of the int * @return the int value with the specified name - * @throws BlazeMessageFormatException if this type conversion is invalid. + * @throws BlazeMessageFormatException + * if this type conversion is invalid. */ - public int getIntValue(String name) throws BlazeMessageFormatException{ + public int getIntValue(String name) throws BlazeMessageFormatException { initializeReading(); Object value = this.map.get(name); if (value == null) { @@ -534,15 +566,17 @@ } throw new BlazeMessageFormatException(" cannot read an int from " + value.getClass().getName()); } - + /** * Returns the long value with the specified name. * - * @param name the name of the long + * @param name + * the name of the long * @return the long value with the specified name - * @throws BlazeMessageFormatException if this type conversion is invalid. + * @throws BlazeMessageFormatException + * if this type conversion is invalid. */ - public long getLongValue(String name) throws BlazeMessageFormatException{ + public long getLongValue(String name) throws BlazeMessageFormatException { initializeReading(); Object value = this.map.get(name); if (value == null) { @@ -565,15 +599,17 @@ } throw new BlazeMessageFormatException(" cannot read a long from " + value.getClass().getName()); } - + /** * Returns the float value with the specified name. * - * @param name the name of the float + * @param name + * the name of the float * @return the float value with the specified name - * @throws BlazeMessageFormatException if this type conversion is invalid. + * @throws BlazeMessageFormatException + * if this type conversion is invalid. */ - public float getFloatValue(String name) throws BlazeMessageFormatException{ + public float getFloatValue(String name) throws BlazeMessageFormatException { initializeReading(); Object value = this.map.get(name); if (value == null) { @@ -587,15 +623,17 @@ } throw new BlazeMessageFormatException(" cannot read a float from " + value.getClass().getName()); } - + /** * Returns the double value with the specified name. * - * @param name the name of the double + * @param name + * the name of the double * @return the double value with the specified name - * @throws BlazeMessageFormatException if this type conversion is invalid. + * @throws BlazeMessageFormatException + * if this type conversion is invalid. */ - public double getDoubleValue(String name) throws BlazeMessageFormatException{ + public double getDoubleValue(String name) throws BlazeMessageFormatException { initializeReading(); Object value = this.map.get(name); if (value == null) { @@ -612,16 +650,18 @@ } throw new BlazeMessageFormatException(" cannot read a double from " + value.getClass().getName()); } - + /** * Returns the String value with the specified name. * - * @param name the name of the String - * @return the String value with the specified name; if there is no item by this name, a null value - * is returned - * @throws BlazeMessageFormatException if this type conversion is invalid. + * @param name + * the name of the String + * @return the String value with the specified name; if there + * is no item by this name, a null value is returned + * @throws BlazeMessageFormatException + * if this type conversion is invalid. */ - public String getStringValue(String name) throws BlazeMessageFormatException{ + public String getStringValue(String name) throws BlazeMessageFormatException { initializeReading(); Object value = this.map.get(name); if (value == null) { @@ -632,15 +672,18 @@ } return value.toString(); } - + /** * Returns the byte array value with the specified name. * - * @param name the name of the byte array - * @return the byte array value with the specified name; if there is no item by this name, a null value is returned. - * @throws BlazeMessageFormatException if this type conversion is invalid. + * @param name + * the name of the byte array + * @return the byte array value with the specified name; if there is no item + * by this name, a null value is returned. + * @throws BlazeMessageFormatException + * if this type conversion is invalid. */ - public byte[] getBytesValue(String name) throws BlazeMessageFormatException{ + public byte[] getBytesValue(String name) throws BlazeMessageFormatException { initializeReading(); Object value = this.map.get(name); if (value instanceof byte[]) { @@ -648,15 +691,18 @@ } throw new BlazeMessageFormatException(" cannot read a byte[] from " + value.getClass().getName()); } - + /** * Returns a Buffer with the specified name. * - * @param name the name of the byte array - * @return the byte array value with the specified name; if there is no item by this name, a null value is returned. - * @throws BlazeMessageFormatException if this type conversion is invalid. + * @param name + * the name of the byte array + * @return the byte array value with the specified name; if there is no item + * by this name, a null value is returned. + * @throws BlazeMessageFormatException + * if this type conversion is invalid. */ - public Buffer getBufferValue(String name) throws BlazeMessageFormatException{ + public Buffer getBufferValue(String name) throws BlazeMessageFormatException { initializeReading(); Object value = this.map.get(name); if (value instanceof Buffer) { @@ -664,44 +710,50 @@ } throw new BlazeMessageFormatException(" cannot read a Buffer from " + value.getClass().getName()); } - + /** * Returns the value of the object with the specified name. *

- * This method can be used to return, in objectified format, an object in the Java programming language ("Java - * object") that had been stored in the Map with the equivalent setObject method call, or its + * This method can be used to return, in objectified format, an object in + * the Java programming language ("Java object") that had been stored in the + * Map with the equivalent setObject method call, or its * equivalent primitive set type method. *

- * Note that byte values are returned as byte[], not Byte[]. + * Note that byte values are returned as byte[], not + * Byte[]. * - * @param name the name of the Java object - * @return a copy of the Java object value with the specified name, in objectified format (for example, if the - * object was set as an int, an Integer is returned); if there is no item by - * this name, a null value is returned + * @param name + * the name of the Java object + * @return a copy of the Java object value with the specified name, in + * objectified format (for example, if the object was set as an + * int, an Integer is returned); if there + * is no item by this name, a null value is returned */ - public Object getObjectValue(String name){ + public Object getObjectValue(String name) { initializeReading(); return this.map.get(name); } - + /** - * Returns an Enumeration of all the names in the BlazeMessage object. + * Returns an Enumeration of all the names in the + * BlazeMessage object. * * @return an enumeration of all the names in this BlazeMessage */ - public Enumeration getNames(){ + public Enumeration getNames() { initializeReading(); return Collections.enumeration(this.map.keySet()); } - + /** * put a key,value pair into the message * * @param name - * @param value must be a supported primitive, or map of supported primitives + * @param value + * must be a supported primitive, or map of supported primitives * @return the previous value associated with the key */ - public Object put(String name,Object value){ + public Object put(String name, Object value) { initializeWriting(); if (name == null) { throw new IllegalArgumentException("The name of the property cannot be null."); @@ -712,115 +764,137 @@ checkValidObject(value); return this.map.put(name, value); } - + /** * Sets a boolean value with the specified name into the Map. * - * @param name the name of the boolean - * @param value the boolean value to set in the Map + * @param name + * the name of the boolean + * @param value + * the boolean value to set in the Map */ - public void setBooleanValue(String name,boolean value){ + public void setBooleanValue(String name, boolean value) { initializeWriting(); put(name, value ? Boolean.TRUE : Boolean.FALSE); } - + /** * Sets a byte value with the specified name into the Map. * - * @param name the name of the byte - * @param value the byte value to set in the Map + * @param name + * the name of the byte + * @param value + * the byte value to set in the Map */ - public void setByteValue(String name,byte value){ + public void setByteValue(String name, byte value) { initializeWriting(); put(name, Byte.valueOf(value)); } - + /** * Sets a short value with the specified name into the Map. * - * @param name the name of the short - * @param value the short value to set in the Map + * @param name + * the name of the short + * @param value + * the short value to set in the Map */ - public void setShortValue(String name,short value){ + public void setShortValue(String name, short value) { initializeWriting(); put(name, Short.valueOf(value)); } - + /** * Sets a Unicode character value with the specified name into the Map. * - * @param name the name of the Unicode character - * @param value the Unicode character value to set in the Map + * @param name + * the name of the Unicode character + * @param value + * the Unicode character value to set in the Map */ - public void setCharValue(String name,char value){ + public void setCharValue(String name, char value) { initializeWriting(); put(name, Character.valueOf(value)); } - + /** * Sets an int value with the specified name into the Map. * - * @param name the name of the int - * @param value the int value to set in the Map + * @param name + * the name of the int + * @param value + * the int value to set in the Map */ - public void setIntValue(String name,int value){ + public void setIntValue(String name, int value) { initializeWriting(); put(name, Integer.valueOf(value)); } - + /** * Sets a long value with the specified name into the Map. * - * @param name the name of the long - * @param value the long value to set in the Map + * @param name + * the name of the long + * @param value + * the long value to set in the Map */ - public void setLongValue(String name,long value){ + public void setLongValue(String name, long value) { initializeWriting(); put(name, Long.valueOf(value)); } - + /** * Sets a float value with the specified name into the Map. * - * @param name the name of the float - * @param value the float value to set in the Map + * @param name + * the name of the float + * @param value + * the float value to set in the Map */ - public void setFloatValue(String name,float value){ + public void setFloatValue(String name, float value) { initializeWriting(); put(name, new Float(value)); } - + /** * Sets a double value with the specified name into the Map. * - * @param name the name of the double - * @param value the double value to set in the Map + * @param name + * the name of the double + * @param value + * the double value to set in the Map */ - public void setDoubleValue(String name,double value){ + public void setDoubleValue(String name, double value) { initializeWriting(); put(name, new Double(value)); } - + /** * Sets a String value with the specified name into the Map. * - * @param name the name of the String - * @param value the String value to set in the Map + * @param name + * the name of the String + * @param value + * the String value to set in the Map */ - public void setStringValue(String name,String value){ + public void setStringValue(String name, String value) { initializeWriting(); put(name, value); } - + /** * Sets a byte array value with the specified name into the Map. * - * @param name the name of the byte array - * @param value the byte array value to set in the Map; the array is copied so that the value for name - * will not be altered by future modifications - * @throws NullPointerException if the name is null, or if the name is an empty string. + * @param name + * the name of the byte array + * @param value + * the byte array value to set in the Map; the array is copied so + * that the value for name will not be altered by + * future modifications + * @throws NullPointerException + * if the name is null, or if the name is an empty string. */ - public void setBytesValue(String name,byte[] value){ + public void setBytesValue(String name, byte[] value) { initializeWriting(); if (value != null) { put(name, value); @@ -828,15 +902,18 @@ this.map.remove(name); } } - + /** * Sets a Buffer value with the specified name into the Map. * - * @param name the name of the byte array - * @param value the Buffer value to set in the Map - * @throws NullPointerException if the name is null, or if the name is an empty string. + * @param name + * the name of the byte array + * @param value + * the Buffer value to set in the Map + * @throws NullPointerException + * if the name is null, or if the name is an empty string. */ - public void setBufferValue(String name,Buffer value){ + public void setBufferValue(String name, Buffer value) { initializeWriting(); if (value != null) { put(name, value); @@ -844,22 +921,27 @@ this.map.remove(name); } } - + /** - * Sets a portion of the byte array value with the specified name into the Map. + * Sets a portion of the byte array value with the specified name into the + * Map. * - * @param name the name of the byte array - * @param value the byte array value to set in the Map - * @param offset the initial offset within the byte array - * @param length the number of bytes to use + * @param name + * the name of the byte array + * @param value + * the byte array value to set in the Map + * @param offset + * the initial offset within the byte array + * @param length + * the number of bytes to use */ - public void setBytesValue(String name,byte[] value,int offset,int length){ + public void setBytesValue(String name, byte[] value, int offset, int length) { initializeWriting(); byte[] data = new byte[length]; System.arraycopy(value, offset, data, 0, length); put(name, data); } - + /** * Find out if the message contains a key This isn't recursive * @@ -867,11 +949,11 @@ * @return true if the message contains the key * */ - public boolean containsKey(Object key){ + public boolean containsKey(Object key) { initializeReading(); return this.map.containsKey(key.toString()); } - + /** * Find out if the message contains a value * @@ -879,60 +961,61 @@ * @return true if the value exists * */ - public boolean containsValue(Object value){ + public boolean containsValue(Object value) { initializeReading(); return this.map.containsValue(value); } - + /** * @return a set of Map.Entry values * */ - public Set> entrySet(){ + public Set> entrySet() { initializeReading(); return this.map.entrySet(); } - + /** * Retrieve the object associated with the key * * @param key * @return the object */ - public Object get(Object key){ + public Object get(Object key) { initializeReading(); return getObjectValue(key.toString()); } - + /** * @return true if the message is empty * */ - public boolean isEmpty(){ + public boolean isEmpty() { initializeReading(); return this.map.isEmpty(); } - + /** * @return a Set of all the keys */ - public Set keySet(){ + public Set keySet() { initializeReading(); return this.map.keySet(); } - + /** * Add all entries in a Map to the message * - * @param t the map + * @param t + * the map * */ - public void putAll(Map t){ + public void putAll(Map t) { for (Map.Entry entry : t.entrySet()) { put(entry.getKey(), entry.getValue()); } } - + /** * Remove a key/value pair from the message * @@ -940,46 +1023,46 @@ * @return the value removed or null * */ - public Object remove(Object key){ + public Object remove(Object key) { setContent(null); return this.map.remove(key.toString()); } - + /** * @return the number of entries in the message */ - public int size(){ + public int size() { initializeReading(); return this.map.size(); } - + /** * @return a Collection of the values in the message */ - public Collection values(){ + public Collection values() { initializeReading(); return this.map.values(); } - + /** * check if a named value exists in the message * * @param name * @return true if value exits */ - public boolean valueExists(String name){ + public boolean valueExists(String name) { return this.map.containsKey(name); } - - protected void initializeReading(){ + + protected void initializeReading() { loadContent(); } - - protected void initializeWriting(){ + + protected void initializeWriting() { setContent(null); } - - protected void checkValidObject(Object value) throws IllegalArgumentException{ + + protected void checkValidObject(Object value) throws IllegalArgumentException { boolean valid = value instanceof Boolean || value instanceof Byte || value instanceof Short || value instanceof Integer || value instanceof Long; valid = valid || value instanceof Float || value instanceof Double || value instanceof Character @@ -995,37 +1078,37 @@ throw new IllegalArgumentException("Not a valid message value: " + value); } } - + /** * @return pretty print * @see java.lang.Object#toString() */ - public String toString(){ + public String toString() { return super.toString() + "MQBlazeMessage{ " + "map = " + this.map + " }"; } - - protected void copy(BlazeMessage copy) throws BlazeException{ + + protected void copy(BlazeMessage copy) throws BlazeException { storeContent(); copy.content = this.content; } - + /** * @return the content data */ - public BlazeData getContent(){ + public BlazeData getContent() { return this.content; } - + /** * Set the content data * * @param content */ - public void setContent(BlazeData content){ + public void setContent(BlazeData content) { this.content = content; } - - protected void marshallMap(MapDataBean mapData,String name,Object value) throws BlazeRuntimeException{ + + protected void marshallMap(MapDataBean mapData, String name, Object value) throws BlazeRuntimeException { if (value != null) { if (value.getClass() == Boolean.class) { BoolTypeBean type = new BoolTypeBean(); @@ -1094,8 +1177,8 @@ } } } - - protected Map unmarshall(MapData mapData){ + + protected Map unmarshall(MapData mapData) { Map result = new ConcurrentHashMap(); if (mapData.hasBoolType()) { for (BoolType type : mapData.getBoolTypeList()) { @@ -1160,71 +1243,85 @@ } return result; } - + /** * Store content into a BlazeData object for serialization */ - public void storeContent(){ + public void storeContent() { if (getContent() == null) { BlazeDataBean bd = new BlazeDataBean(); - MapDataBean mapData = new MapDataBean(); - for (Map.Entry entry : this.map.entrySet()) { - marshallMap(mapData, entry.getKey().toString(), entry.getValue()); + if (!this.map.isEmpty()) { + MapDataBean mapData = new MapDataBean(); + for (Map.Entry entry : this.map.entrySet()) { + marshallMap(mapData, entry.getKey().toString(), entry.getValue()); + } + bd.setMapData(mapData); } - bd.setMapData(mapData); if (this.replyTo != null) { bd.setReplyToData(this.replyTo.getData()); } if (this.messageId != null) { - bd.setMessageId(new Buffer(this.messageId)); + bd.setMessageId(new UTF8Buffer(this.messageId)); } if (this.correlationId != null) { - bd.setCorrelationId(new Buffer(this.correlationId)); + bd.setCorrelationId(new UTF8Buffer(this.correlationId)); } if (this.fromId != null) { - bd.setFromId(new Buffer(this.fromId)); + bd.setFromId(new UTF8Buffer(this.fromId)); } if (this.messageType != null) { - bd.setMessageType(new Buffer(this.messageType)); + bd.setMessageType(new UTF8Buffer(this.messageType)); + } + if (this.timeStamp > 0) { + bd.setTimestamp(this.timeStamp); + } + if (this.expiration > 0) { + bd.setExpiration(this.expiration); + } + if (this.redeliveryCounter > 0) { + bd.setRedeliveryCounter(this.redeliveryCounter); + } + if (this.priority > 0) { + bd.setPriority(this.priority); + } + if (this.persistent) { + bd.setPersistent(this.persistent); } - bd.setTimestamp(this.timeStamp); - bd.setExpiration(this.expiration); - bd.setRedeliveryCounter(this.redeliveryCounter); - bd.setPriority(this.priority); - bd.setPersistent(this.persistent); this.content = bd; } } - + /** * Builds the message body from data * */ - protected void loadContent() throws BlazeRuntimeException{ - BlazeData data = getContent(); - if (data != null && this.map.isEmpty()) { - this.map = unmarshall(data.getMapData()); - if (data.hasReplyToData()) { - this.replyTo = new Destination(data.getReplyToData()); - } - if (data.hasFromId()) { - this.fromId = data.getFromId().toStringUtf8(); - } - if (data.hasMessageId()) { - this.messageId = data.getMessageId().toStringUtf8(); - } - if (data.hasCorrelationId()) { - this.correlationId = data.getCorrelationId().toStringUtf8(); - } - if (data.hasMessageType()) { - this.messageType = data.getMessageType().toStringUtf8(); + protected void loadContent() throws BlazeRuntimeException { + if (!this.loaded) { + this.loaded = true; + BlazeData data = getContent(); + if (data != null && this.map.isEmpty()) { + this.map = unmarshall(data.getMapData()); + if (data.hasReplyToData()) { + this.replyTo = new Destination(data.getReplyToData()); + } + if (data.hasFromId()) { + this.fromId = data.getFromId().toStringUtf8(); + } + if (data.hasMessageId()) { + this.messageId = data.getMessageId().toStringUtf8(); + } + if (data.hasCorrelationId()) { + this.correlationId = data.getCorrelationId().toStringUtf8(); + } + if (data.hasMessageType()) { + this.messageType = data.getMessageType().toStringUtf8(); + } + this.timeStamp = data.getTimestamp(); + this.expiration = data.getExpiration(); + this.redeliveryCounter = data.getRedeliveryCounter(); + this.priority = data.getPriority(); + this.persistent = data.getPersistent(); } - this.timeStamp = data.getTimestamp(); - this.expiration = data.getExpiration(); - this.redeliveryCounter = data.getRedeliveryCounter(); - this.priority = data.getPriority(); - this.persistent = data.getPersistent(); } } - } \ No newline at end of file Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java?rev=753178&r1=753177&r2=753178&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java Fri Mar 13 09:04:23 2009 @@ -28,7 +28,8 @@ * Process a PacketData of that is a BlazeMessage type * @param data * @return the built BlazeMessage + * @throws Exception */ - BlazeMessage processBlazeMessage(PacketData data); + BlazeMessage processBlazeMessage(PacketData data) throws Exception; } Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=753178&r1=753177&r2=753178&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Fri Mar 13 09:04:23 2009 @@ -459,16 +459,11 @@ if (member != null) { SendRequest request = new SendRequest(); Destination dest = new Destination(destinationName, false); - message.setDestination(dest); - message.storeContent(); - BlazeDataBuffer blazeData = message.getContent().freeze(); - PacketDataBean packetData = getPacketData(MessageType.BLAZE_DATA, blazeData); - packetData.setDestinationData(dest.getData()); - packetData.setPayloadType(message.getType()); + Packet packet = buildPacket(dest, message); synchronized (this.messageRequests) { - this.messageRequests.put(packetData.getMessageId(), request); + this.messageRequests.put(packet.getPacketData().getMessageId(), request); } - Packet packet = new Packet(packetData.freeze()); + packet.setTo((member).getAddress()); this.unicast.downStream(packet); PacketDataBuffer response = request.get(timeout); @@ -486,32 +481,15 @@ * org.apache.activeblaze.BlazeMessage, java.lang.String) */ public void sendReply(Member to,BlazeMessage response,String correlationId) throws Exception{ - response.storeContent(); - Destination dest = response.getDestination(); - BlazeDataBuffer blazeData = response.getContent().freeze(); - PacketDataBean data = getPacketData(MessageType.BLAZE_DATA, blazeData); - data.setCorrelationId(new Buffer(correlationId)); - if (dest != null) { - data.setDestinationData(dest.getData()); - } - data.setPayloadType(response.getType()); - data.setReliable(true); - Packet packet = new Packet(data.freeze()); + Destination dest = new Destination(to.getInBoxDestination(), false); + Packet packet = buildPacket(dest,response,correlationId); packet.setTo(((MemberImpl) to).getAddress()); this.unicast.downStream(packet); } protected void send(MemberImpl member,Buffer destinationName,BlazeMessage message) throws Exception{ Destination dest = new Destination(destinationName, false); - message.setDestination(dest); - message.storeContent(); - - PacketDataBean data = getPacketData(MessageType.BLAZE_DATA, message.getContent().freeze()); - data.setReliable(true); - data.setResponseRequired(true); - data.setDestinationData(dest.getData()); - data.setPayloadType(message.getType()); - Packet packet = new Packet(data.freeze()); + Packet packet = buildPacket(dest, message,true); packet.setTo(member.getAddress()); this.unicast.downStream(packet); } Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java?rev=753178&r1=753177&r2=753178&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java Fri Mar 13 09:04:23 2009 @@ -17,21 +17,63 @@ package org.apache.activeblaze.impl.reliable.simple; import org.apache.activeblaze.impl.processor.DefaultChainedProcessor; - +import org.apache.activeblaze.impl.processor.Packet; /** * Very basic (none) reliability - * + * */ -public class SimpleReliableProcessor extends DefaultChainedProcessor{ - - private SimpleFlow simpleFlow; - - /** +public class SimpleReliableProcessor extends DefaultChainedProcessor { + int maxWindowSize = 64 * 1024; + int windowSize = 0; + int pauseTime = 0; + + /** * Constructor */ public SimpleReliableProcessor() { - this.simpleFlow=new SimpleFlow(); - //setEnd(this.simpleFlow); - } - + } + + /** + * @param p + * @throws Exception + * @see org.apache.activeblaze.impl.processor.DefaultChainedProcessor#downStream(org.apache.activeblaze.impl.processor.Packet) + */ + public void downStream(Packet p) throws Exception { + this.windowSize += p.getPacketData().serializedSizeFramed(); + if (this.windowSize >= this.maxWindowSize) { + Thread.sleep(this.pauseTime); + this.windowSize = 0; + } + super.downStream(p); + } + + /** + * @return the maxWindowSize + */ + public int getMaxWindowSize() { + return this.maxWindowSize; + } + + /** + * @param maxWindowSize + * the maxWindowSize to set + */ + public void setMaxWindowSize(int maxWindowSize) { + this.maxWindowSize = maxWindowSize; + } + + /** + * @return the pauseTime + */ + public int getPauseTime() { + return this.pauseTime; + } + + /** + * @param pauseTime + * the pauseTime to set + */ + public void setPauseTime(int pauseTime) { + this.pauseTime = pauseTime; + } } Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java?rev=753178&r1=753177&r2=753178&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java Fri Mar 13 09:04:23 2009 @@ -18,7 +18,6 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; - import javax.jms.Connection; import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; @@ -34,20 +33,24 @@ import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicSession; - import org.apache.activeblaze.BlazeMessageListener; import org.apache.activeblaze.BlazeMessageProcessor; import org.apache.activeblaze.Subscription; import org.apache.activeblaze.group.BlazeGroupChannel; import org.apache.activeblaze.jms.message.BlazeJmsMessage; +import org.apache.activeblaze.jms.message.BlazeJmsMessageTransformation; import org.apache.activeblaze.util.IdGenerator; +import org.apache.activeblaze.wire.DestinationData; import org.apache.activeblaze.wire.PacketData; +import org.apache.activeblaze.wire.BlazeData.BlazeDataBuffer; +import org.apache.activemq.protobuf.Buffer; +import org.apache.activemq.protobuf.InvalidProtocolBufferException; /** * Implementation of a JMS Connection * */ public class BlazeJmsConnection implements Connection, TopicConnection, QueueConnection, - org.apache.activeblaze.ExceptionListener,BlazeMessageProcessor{ + org.apache.activeblaze.ExceptionListener, BlazeMessageProcessor { protected final BlazeGroupChannel channel; protected final IdGenerator tempDestinationGenerator = new IdGenerator(""); private String clientId; @@ -60,7 +63,7 @@ protected BlazeJmsConnection(BlazeGroupChannel channel) { this.channel = channel; this.channel.setExceptionListener(this); - this.clientId = channel.getName(); + this.clientId = channel.getName(); this.channel.setBlazeMessageProcessor(this); } @@ -333,9 +336,9 @@ protected void removeMesssageDispatcher(BlazeMessageListener consumer, Subscription s) throws JMSException { try { if (s.isTopic()) { - this.channel.removeBlazeTopicMessageListener(s,consumer); + this.channel.removeBlazeTopicMessageListener(s, consumer); } else { - this.channel.removeBlazeQueueMessageListener(s,consumer); + this.channel.removeBlazeQueueMessageListener(s, consumer); } } catch (Exception e) { throw BlazeJmsExceptionSupport.create(e); @@ -362,28 +365,34 @@ } /** - * @param data + * @param data * @return a BlazeMessage + * @throws Exception * */ - public BlazeJmsMessage processBlazeMessage(PacketData data){ + public BlazeJmsMessage processBlazeMessage(PacketData data) throws Exception { BlazeJmsMessage result = null; - /* - int type = message.getType(); - if (type == BlazeJmsMessage.JmsMessageType.BYTES.ordinal()) { - result = new BlazeJmsBytesMessage(); - } else if (type == BlazeJmsMessage.JmsMessageType.MAP.ordinal()) { - result = new BlazeJmsMapMessage(); - } else if (type == BlazeJmsMessage.JmsMessageType.OBJECT.ordinal()) { - result = new BlazeJmsObjectMessage(); - } else if (type == BlazeJmsMessage.JmsMessageType.STREAM.ordinal()) { - result = new BlazeJmsStreamMessage(); - } else if (type == BlazeJmsMessage.JmsMessageType.TEXT.ordinal()) { - result = new BlazeJmsTextMessage(); - } else { - result = new BlazeJmsMessage(); + if (data != null) { + DestinationData destination = data.getDestinationData(); + Buffer payload = data.getPayload(); + BlazeDataBuffer blazeData = BlazeDataBuffer.parseUnframed(payload); + String fromId = null; + if (data.hasProducerId()) { + fromId = data.getProducerId().toStringUtf8(); + } + result = BlazeJmsMessageTransformation.createMessage(data.getPayloadType()); + result.setDestination(destination); + result.setFromId(fromId); + if (data.hasMessageId()) { + result.setMessageId(data.getMessageId().toStringUtf8()); + } + if (data.hasCorrelationId()) { + result.setCorrelationId(data.getCorrelationId().toStringUtf8()); + } + result.setTimeStamp(blazeData.getTimestamp()); + result.setType(data.getPayloadType()); + result.setContent(blazeData); } - */ return result; } } Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java?rev=753178&r1=753177&r2=753178&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java Fri Mar 13 09:04:23 2009 @@ -17,7 +17,6 @@ package org.apache.activeblaze.jms.message; import java.util.Enumeration; - import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.JMSException; @@ -27,34 +26,33 @@ import javax.jms.ObjectMessage; import javax.jms.StreamMessage; import javax.jms.TextMessage; - import org.apache.activeblaze.BlazeMessage; import org.apache.activeblaze.jms.BlazeJmsDestination; - /** - * A helper class for converting normal JMS interfaces into ActiveMQ specific ones. + * A helper class for converting normal JMS interfaces into ActiveMQ specific + * ones. * * @version $Revision: 1.1 $ */ -public final class BlazeJmsMessageTransformation{ +public final class BlazeJmsMessageTransformation { private BlazeJmsMessageTransformation() { } - + /** * @param dest * @return a BlazeJmsDestination * @throws JMSException */ - private static BlazeJmsDestination transformDestination(Destination dest) throws JMSException{ + private static BlazeJmsDestination transformDestination(Destination dest) throws JMSException { return BlazeJmsDestination.transform(dest); } - + /** * @param message * @return a BlazeJmsMessage * @throws JMSException */ - public static BlazeJmsMessage transformMessage(BlazeMessage message) throws JMSException{ + public static BlazeJmsMessage transformMessage(BlazeMessage message) throws JMSException { BlazeJmsMessage result = null; if (message instanceof BlazeJmsMessage) { result = (BlazeJmsMessage) message; @@ -77,13 +75,13 @@ } return result; } - + /** * @param message * @return a BlazeJmsDestination * @throws JMSException */ - public static BlazeJmsMessage transformMessage(Message message) throws JMSException{ + public static BlazeJmsMessage transformMessage(Message message) throws JMSException { if (message instanceof BlazeJmsMessage) { return (BlazeJmsMessage) message; } @@ -143,15 +141,18 @@ copyProperties(message, transformedMessage); return transformedMessage; } - + /** - * Copies the standard JMS and user defined properties from the givem message to the specified message + * Copies the standard JMS and user defined properties from the givem + * message to the specified message * - * @param fromMessage the message to take the properties from - * @param toMessage the message to add the properties to + * @param fromMessage + * the message to take the properties from + * @param toMessage + * the message to add the properties to * @throws JMSException */ - public static void copyProperties(Message fromMessage,Message toMessage) throws JMSException{ + public static void copyProperties(Message fromMessage, Message toMessage) throws JMSException { toMessage.setJMSMessageID(fromMessage.getJMSMessageID()); toMessage.setJMSCorrelationID(fromMessage.getJMSCorrelationID()); toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo())); @@ -169,4 +170,27 @@ toMessage.setObjectProperty(name, obj); } } + + /** + * @param type + * @return a BlazeJmsMessage + */ + public static BlazeJmsMessage createMessage(int type) { + if (type == BlazeJmsMessage.JmsMessageType.BYTES.ordinal()) { + return new BlazeJmsBytesMessage(); + } + if (type == BlazeJmsMessage.JmsMessageType.MAP.ordinal()) { + return new BlazeJmsMapMessage(); + } + if (type == BlazeJmsMessage.JmsMessageType.OBJECT.ordinal()) { + return new BlazeJmsObjectMessage(); + } + if (type == BlazeJmsMessage.JmsMessageType.STREAM.ordinal()) { + return new BlazeJmsStreamMessage(); + } + if (type == BlazeJmsMessage.JmsMessageType.TEXT.ordinal()) { + return new BlazeJmsTextMessage(); + } + return new BlazeJmsMessage(); + } }