Author: rajdavies
Date: Thu Mar 12 10:12:30 2009
New Revision: 752825
URL: http://svn.apache.org/viewvc?rev=752825&view=rev
Log:
performance improvements for JMS
Added:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java
activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/SimpleTopicTest.java
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=752825&r1=752824&r2=752825&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java Thu Mar 12 10:12:30 2009
@@ -30,6 +30,7 @@
import org.apache.activeblaze.impl.reliable.ReliableFactory;
import org.apache.activeblaze.util.IdGenerator;
import org.apache.activeblaze.util.PropertyUtil;
+import org.apache.activeblaze.wire.DestinationData;
import org.apache.activeblaze.wire.MessageType;
import org.apache.activeblaze.wire.PacketData;
import org.apache.activeblaze.wire.BlazeData.BlazeDataBuffer;
@@ -50,6 +51,7 @@
protected Buffer producerId;
protected Processor broadcast;
protected BlazeConfiguration configuration = new BlazeConfiguration();
+ protected BlazeMessageProcessor blazeMessageProcessor;
private String id;
/**
@@ -188,13 +190,15 @@
BlazeDataBuffer blazeData = msg.getContent().freeze();
PacketDataBean packetData = getPacketData(MessageType.BLAZE_DATA, blazeData);
packetData.setReliable(true);
+ packetData.setDestinationData(destination.getData());
+ packetData.setPayloadType(msg.getType());
Packet packet = new Packet(packetData.freeze());
this.broadcast.downStream(packet);
}
protected final synchronized PacketDataBean getPacketData(MessageType type, MessageBuffer message) {
PacketDataBean packetData = new PacketDataBean();
- packetData.setType(type);
+ packetData.setMessageType(type);
packetData.setProducerId(this.producerId);
packetData.setPayload(message.toUnframedBuffer());
packetData.setMessageId(new Buffer(this.idGenerator.generateId()));
@@ -207,7 +211,7 @@
}
protected void processData(String id, Buffer correlationId, PacketDataBuffer data) throws Exception {
- MessageType type = data.getType();
+ MessageType type = data.getMessageType();
if (type == MessageType.BLAZE_DATA) {
doProcessBlazeData(data);
}
@@ -225,6 +229,21 @@
public void setConfiguration(BlazeConfiguration configuration) {
this.configuration = configuration;
}
+
+
+ /**
+ * @return the blazeMessageProcessor
+ */
+ public BlazeMessageProcessor getBlazeMessageProcessor(){
+ return this.blazeMessageProcessor;
+ }
+
+ /**
+ * @param blazeMessageProcessor the blazeMessageProcessor to set
+ */
+ public void setBlazeMessageProcessor(BlazeMessageProcessor blazeMessageProcessor){
+ this.blazeMessageProcessor = blazeMessageProcessor;
+ }
/**
* @param ex
@@ -235,45 +254,53 @@
}
protected void doProcessBlazeData(PacketData data) throws Exception {
- BlazeMessage message = buildBlazeMessage(data);
- dispatch(message);
+ dispatch(data);
}
protected final BlazeMessage buildBlazeMessage(PacketData data) throws Exception {
- BlazeMessage message = null;
+ BlazeMessage result = null;
+ if (this.blazeMessageProcessor != null) {
+ result = this.blazeMessageProcessor.processBlazeMessage(data);
+ }else {
+
if (data != null) {
+ DestinationData destination = data.getDestinationData();
Buffer payload = data.getPayload();
BlazeDataBuffer blazeData = BlazeDataBuffer.parseUnframed(payload);
String fromId = null;
if (data.hasProducerId()) {
fromId = data.getProducerId().toStringUtf8();
}
- message = createMessage(fromId);
- if( blazeData.hasDestinationData() ) {
- message.setDestination(blazeData.getDestinationData());
- }
- message.setFromId(fromId);
+ result = createMessage(fromId);
+ result.setDestination(destination);
+ result.setFromId(fromId);
if (data.hasMessageId()) {
- message.setMessageId(data.getMessageId().toStringUtf8());
+ result.setMessageId(data.getMessageId().toStringUtf8());
}
if (data.hasCorrelationId()) {
- message.setCorrelationId(data.getCorrelationId().toStringUtf8());
+ result.setCorrelationId(data.getCorrelationId().toStringUtf8());
}
- message.setTimeStamp(blazeData.getTimestamp());
- message.setContent(blazeData);
+ result.setTimeStamp(blazeData.getTimestamp());
+ result.setType(data.getPayloadType());
+ result.setContent(blazeData);
}
- return message;
+ }
+ return result;
}
protected BlazeMessage createMessage(String fromId) {
return new BlazeMessage();
}
- protected final void dispatch(BlazeMessage message) {
- if (message != null) {
- Buffer destination = message.getContent().getDestinationData().getName();
+ protected final void dispatch(PacketData data) throws Exception {
+ if (data != null) {
+ BlazeMessage message = null;
+ Buffer destination = data.getDestinationData().getName();
for (SubscriptionHolder entry : this.topicMessageListeners) {
if (entry.getSubscription().matches(destination)) {
+ if (message == null) {
+ message = buildBlazeMessage(data);
+ }
entry.getListener().onMessage(message);
}
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java?rev=752825&r1=752824&r2=752825&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java Thu Mar 12 10:12:30 2009
@@ -89,7 +89,7 @@
* <code>NullPointerException</code>.
*
*/
-public class BlazeMessage implements Map<String, Object> {
+public class BlazeMessage implements Map<String, Object>{
private static final String DEFAULT_TEXT_PAYLOAD = "DEFAULT_TEXT_PAYLOAD";
private static final String DEFAULT_BYTES_PAYLOAD = "DEFAULT_BYTES_PAYLOAD";
private static final String DEFAULT_OBJECT_PAYLOAD = "DEFAULT_OBJECT_PAYLOAD";
@@ -107,13 +107,13 @@
private transient boolean persistent;
private transient int type;
private BlazeData content;
-
+
/**
* Default Constructor
*/
public BlazeMessage() {
}
-
+
/**
* Constructor - Utility to construct a message with a text <Code>String</Code> payload
*
@@ -122,7 +122,7 @@
public BlazeMessage(String text) {
setStringValue(DEFAULT_TEXT_PAYLOAD, text);
}
-
+
/**
* Constructor - Utility to construct a message with a byte[] array payload
*
@@ -131,7 +131,7 @@
public BlazeMessage(byte[] data) {
setBytesValue(DEFAULT_BYTES_PAYLOAD, data);
}
-
+
/**
* Constructor - Utility to construct a message with an object payload
*
@@ -140,275 +140,269 @@
public BlazeMessage(Object data) {
setObject(data);
}
-
+
/**
* Utility method for setting a default <Code>String</Code> payload
*
* @param text
*/
- public void setText(String text) {
+ public void setText(String text){
setStringValue(DEFAULT_TEXT_PAYLOAD, text);
}
-
+
/**
* Utility method used for when a BlazeMessage is only carrying a byte[] array
*
* @return text the default text
* @throws Exception
*/
- public String getText() throws Exception {
+ public String getText() throws Exception{
return getStringValue(DEFAULT_TEXT_PAYLOAD);
}
-
+
/**
* Utility method for setting a default <Code>byte[]</Code> payload
*
* @param payload
*/
- public void setBytes(byte[] payload) {
+ public void setBytes(byte[] payload){
setBytesValue(DEFAULT_BYTES_PAYLOAD, payload);
}
-
+
/**
* Utility method used for when a BlazeMessage is only carrying an Object
*
* @return text the default text
* @throws Exception
*/
- public Object getObject() throws Exception {
+ public Object getObject() throws Exception{
Buffer buffer = getBufferValue(DEFAULT_OBJECT_PAYLOAD);
return IOUtils.getObject(buffer);
}
-
+
/**
* Utility method for setting a default <Code>Object</Code> payload
*
* @param payload
*/
- public void setObject(Object payload) {
+ public void setObject(Object payload){
try {
put(DEFAULT_OBJECT_PAYLOAD, IOUtils.getBuffer(payload));
} catch (Exception e) {
throw new BlazeRuntimeException(e);
}
}
-
+
/**
* Utility method used for when a BlazeMessage is only carrying a String
*
* @return text the default text
* @throws Exception
*/
- public byte[] getBytes() throws Exception {
+ public byte[] getBytes() throws Exception{
return getBytesValue(DEFAULT_BYTES_PAYLOAD);
}
-
+
/**
* @return the destination
*/
- public Destination getDestination() {
+ public Destination getDestination(){
initializeReading();
return this.destination;
}
-
+
/**
- * @param destination
- * the destination to set
+ * @param destination the destination to set
*/
- public void setDestination(Destination destination) {
+ public void setDestination(Destination destination){
this.destination = destination;
}
-
+
/**
* @param destination
*/
- public void setDestination(DestinationData destination) {
- this.destination = new Destination(destination);
+ public void setDestination(DestinationData destinationData){
+ if (destinationData != null) {
+ this.destination = new Destination(destinationData);
+ }
}
-
+
/**
* The id of the channel that sent the message
*
* @return the fromId
*/
- public String getFromId() {
+ public String getFromId(){
initializeReading();
return this.fromId;
}
-
+
/**
- * @param fromId
- * the fromId to set
+ * @param fromId the fromId to set
*/
- public void setFromId(String fromId) {
+ public void setFromId(String fromId){
this.fromId = fromId;
}
-
+
/**
* @return the messageId
*/
- public String getMessageId() {
+ public String getMessageId(){
initializeReading();
return this.messageId;
}
-
+
/**
- * @param messageId
- * the messageId to set
+ * @param messageId the messageId to set
*/
- public void setMessageId(String messageId) {
+ public void setMessageId(String messageId){
this.messageId = messageId;
}
-
+
/**
* @return the correlationId
*/
- public String getCorrelationId() {
+ public String getCorrelationId(){
initializeReading();
return this.correlationId;
}
-
+
/**
- * @param correlationId
- * the correlationId to set
+ * @param correlationId the correlationId to set
*/
- public void setCorrelationId(String correlationId) {
+ public void setCorrelationId(String correlationId){
this.correlationId = correlationId;
}
-
+
/**
* @return the timeStamp
*/
- public long getTimeStamp() {
+ public long getTimeStamp(){
initializeReading();
return this.timeStamp;
}
-
+
/**
- * @param timeStamp
- * the timeStamp to set
+ * @param timeStamp the timeStamp to set
*/
- public void setTimeStamp(long timeStamp) {
+ public void setTimeStamp(long timeStamp){
this.timeStamp = timeStamp;
}
-
+
/**
* @return the replyTo
*/
- public Destination getReplyTo() {
+ public Destination getReplyTo(){
initializeReading();
return this.replyTo;
}
-
+
/**
- * @param replyTo
- * the replyTo to set
+ * @param replyTo the replyTo to set
*/
- public void setReplyTo(Destination replyTo) {
+ public void setReplyTo(Destination replyTo){
this.replyTo = replyTo;
}
-
+
/**
- * @param replyTo
- * the replyTo to set
+ * @param replyTo the replyTo to set
*/
- public void setReplyTo(DestinationData replyTo) {
+ public void setReplyTo(DestinationData replyTo){
this.replyTo = new Destination(replyTo);
}
-
+
/**
* @return the expiration
*/
- public long getExpiration() {
+ public long getExpiration(){
initializeReading();
return this.expiration;
}
-
+
/**
- * @param expiration
- * the expiration to set
+ * @param expiration the expiration to set
*/
- public void setExpiration(long expiration) {
+ public void setExpiration(long expiration){
this.expiration = expiration;
}
-
+
/**
* @return the redeliveryCounter
*/
- public int getRedeliveryCounter() {
+ public int getRedeliveryCounter(){
initializeReading();
return this.redeliveryCounter;
}
-
+
/**
- * @param redeliveryCounter
- * the redeliveryCounter to set
+ * @param redeliveryCounter the redeliveryCounter to set
*/
- public void setRedeliveryCounter(int redeliveryCounter) {
+ public void setRedeliveryCounter(int redeliveryCounter){
this.redeliveryCounter = redeliveryCounter;
}
-
+
/**
* @return the priority
*/
- public int getPriority() {
+ public int getPriority(){
initializeReading();
return this.priority;
}
-
+
/**
- * @param priority
- * the priority to set
+ * @param priority the priority to set
*/
- public void setPriority(int priority) {
+ public void setPriority(int priority){
this.priority = priority;
}
-
+
/**
* @return the persistent
*/
- public boolean isPersistent() {
+ public boolean isPersistent(){
initializeReading();
return this.persistent;
}
-
+
/**
- * @param persistent
- * the persistent to set
+ * @param persistent the persistent to set
*/
- public void setPersistent(boolean persistent) {
+ public void setPersistent(boolean persistent){
this.persistent = persistent;
}
-
+
/**
* @return the type
*/
- public String getMessageType() {
+ public String getMessageType(){
initializeReading();
return this.messageType;
}
-
+
/**
- * @param type
- * the type to set
+ * @param type the type to set
*/
- public void setMessageType(String type) {
+ public void setMessageType(String type){
this.messageType = type;
}
/**
* Get the type
+ *
* @return the type
*/
- public int getType() {
- initializeReading();
+ public int getType(){
return this.type;
}
-
+
+ public void setType(int type){
+ this.type = type;
+ }
+
/**
* @return a copy of this message
*/
- public BlazeMessage clone() {
+ public BlazeMessage clone(){
BlazeMessage copy = new BlazeMessage();
try {
copy(copy);
@@ -417,24 +411,22 @@
}
return copy;
}
-
+
/**
* clear the contents of this message
*/
- public void clear() {
+ public void clear(){
this.map.clear();
}
-
+
/**
* Returns the <CODE>boolean</CODE> value with the specified name.
*
- * @param name
- * the name of the <CODE>boolean</CODE>
+ * @param name the name of the <CODE>boolean</CODE>
* @return the <CODE>boolean</CODE> value with the specified name
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
- public boolean getBooleanValue(String name) throws BlazeMessageFormatException {
+ public boolean getBooleanValue(String name) throws BlazeMessageFormatException{
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -448,17 +440,15 @@
}
throw new BlazeMessageFormatException(" cannot read a boolean from " + value.getClass().getName());
}
-
+
/**
* Returns the <CODE>byte</CODE> value with the specified name.
*
- * @param name
- * the name of the <CODE>byte</CODE>
+ * @param name the name of the <CODE>byte</CODE>
* @return the <CODE>byte</CODE> value with the specified name
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
- public byte getByteValue(String name) throws BlazeMessageFormatException {
+ public byte getByteValue(String name) throws BlazeMessageFormatException{
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -472,17 +462,15 @@
}
throw new BlazeMessageFormatException(" cannot read a byte from " + value.getClass().getName());
}
-
+
/**
* Returns the <CODE>short</CODE> value with the specified name.
*
- * @param name
- * the name of the <CODE>short</CODE>
+ * @param name the name of the <CODE>short</CODE>
* @return the <CODE>short</CODE> value with the specified name
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
- public short getShortValue(String name) throws BlazeMessageFormatException {
+ public short getShortValue(String name) throws BlazeMessageFormatException{
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -499,17 +487,15 @@
}
throw new BlazeMessageFormatException(" cannot read a short from " + value.getClass().getName());
}
-
+
/**
* Returns the Unicode character value with the specified name.
*
- * @param name
- * the name of the Unicode character
+ * @param name the name of the Unicode character
* @return the Unicode character value with the specified name
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
- public char getCharValue(String name) throws BlazeMessageFormatException {
+ public char getCharValue(String name) throws BlazeMessageFormatException{
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -520,17 +506,15 @@
}
throw new BlazeMessageFormatException(" cannot read a short from " + value.getClass().getName());
}
-
+
/**
* Returns the <CODE>int</CODE> value with the specified name.
*
- * @param name
- * the name of the <CODE>int</CODE>
+ * @param name the name of the <CODE>int</CODE>
* @return the <CODE>int</CODE> value with the specified name
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
- public int getIntValue(String name) throws BlazeMessageFormatException {
+ public int getIntValue(String name) throws BlazeMessageFormatException{
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -550,17 +534,15 @@
}
throw new BlazeMessageFormatException(" cannot read an int from " + value.getClass().getName());
}
-
+
/**
* Returns the <CODE>long</CODE> value with the specified name.
*
- * @param name
- * the name of the <CODE>long</CODE>
+ * @param name the name of the <CODE>long</CODE>
* @return the <CODE>long</CODE> value with the specified name
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
- public long getLongValue(String name) throws BlazeMessageFormatException {
+ public long getLongValue(String name) throws BlazeMessageFormatException{
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -583,17 +565,15 @@
}
throw new BlazeMessageFormatException(" cannot read a long from " + value.getClass().getName());
}
-
+
/**
* Returns the <CODE>float</CODE> value with the specified name.
*
- * @param name
- * the name of the <CODE>float</CODE>
+ * @param name the name of the <CODE>float</CODE>
* @return the <CODE>float</CODE> value with the specified name
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
- public float getFloatValue(String name) throws BlazeMessageFormatException {
+ public float getFloatValue(String name) throws BlazeMessageFormatException{
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -607,17 +587,15 @@
}
throw new BlazeMessageFormatException(" cannot read a float from " + value.getClass().getName());
}
-
+
/**
* Returns the <CODE>double</CODE> value with the specified name.
*
- * @param name
- * the name of the <CODE>double</CODE>
+ * @param name the name of the <CODE>double</CODE>
* @return the <CODE>double</CODE> value with the specified name
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
- public double getDoubleValue(String name) throws BlazeMessageFormatException {
+ public double getDoubleValue(String name) throws BlazeMessageFormatException{
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -634,18 +612,16 @@
}
throw new BlazeMessageFormatException(" cannot read a double from " + value.getClass().getName());
}
-
+
/**
* Returns the <CODE>String</CODE> value with the specified name.
*
- * @param name
- * the name of the <CODE>String</CODE>
+ * @param name the name of the <CODE>String</CODE>
* @return the <CODE>String</CODE> value with the specified name; if there is no item by this name, a null value
* is returned
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
- public String getStringValue(String name) throws BlazeMessageFormatException {
+ public String getStringValue(String name) throws BlazeMessageFormatException{
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -656,17 +632,15 @@
}
return value.toString();
}
-
+
/**
* Returns the byte array value with the specified name.
*
- * @param name
- * the name of the byte array
+ * @param name the name of the byte array
* @return the byte array value with the specified name; if there is no item by this name, a null value is returned.
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
- public byte[] getBytesValue(String name) throws BlazeMessageFormatException {
+ public byte[] getBytesValue(String name) throws BlazeMessageFormatException{
initializeReading();
Object value = this.map.get(name);
if (value instanceof byte[]) {
@@ -674,17 +648,15 @@
}
throw new BlazeMessageFormatException(" cannot read a byte[] from " + value.getClass().getName());
}
-
+
/**
* Returns a Buffer with the specified name.
*
- * @param name
- * the name of the byte array
+ * @param name the name of the byte array
* @return the byte array value with the specified name; if there is no item by this name, a null value is returned.
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
- public Buffer getBufferValue(String name) throws BlazeMessageFormatException {
+ public Buffer getBufferValue(String name) throws BlazeMessageFormatException{
initializeReading();
Object value = this.map.get(name);
if (value instanceof Buffer) {
@@ -692,7 +664,7 @@
}
throw new BlazeMessageFormatException(" cannot read a Buffer from " + value.getClass().getName());
}
-
+
/**
* Returns the value of the object with the specified name.
* <P>
@@ -702,36 +674,34 @@
* <P>
* Note that byte values are returned as <CODE>byte[]</CODE>, not <CODE>Byte[]</CODE>.
*
- * @param name
- * the name of the Java object
+ * @param name the name of the Java object
* @return a copy of the Java object value with the specified name, in objectified format (for example, if the
* object was set as an <CODE>int</CODE>, an <CODE>Integer</CODE> is returned); if there is no item by
* this name, a null value is returned
*/
- public Object getObjectValue(String name) {
+ public Object getObjectValue(String name){
initializeReading();
return this.map.get(name);
}
-
+
/**
* Returns an <CODE>Enumeration</CODE> of all the names in the <CODE>BlazeMessage</CODE> object.
*
* @return an enumeration of all the names in this <CODE>BlazeMessage</CODE>
*/
- public Enumeration<String> getNames() {
+ public Enumeration<String> getNames(){
initializeReading();
return Collections.enumeration(this.map.keySet());
}
-
+
/**
* put a key,value pair into the message
*
* @param name
- * @param value
- * must be a supported primitive, or map of supported primitives
+ * @param value must be a supported primitive, or map of supported primitives
* @return the previous value associated with the key
*/
- public Object put(String name, Object value) {
+ public Object put(String name,Object value){
initializeWriting();
if (name == null) {
throw new IllegalArgumentException("The name of the property cannot be null.");
@@ -742,136 +712,115 @@
checkValidObject(value);
return this.map.put(name, value);
}
-
+
/**
* Sets a <CODE>boolean</CODE> value with the specified name into the Map.
*
- * @param name
- * the name of the <CODE>boolean</CODE>
- * @param value
- * the <CODE>boolean</CODE> value to set in the Map
+ * @param name the name of the <CODE>boolean</CODE>
+ * @param value the <CODE>boolean</CODE> value to set in the Map
*/
- public void setBooleanValue(String name, boolean value) {
+ public void setBooleanValue(String name,boolean value){
initializeWriting();
put(name, value ? Boolean.TRUE : Boolean.FALSE);
}
-
+
/**
* Sets a <CODE>byte</CODE> value with the specified name into the Map.
*
- * @param name
- * the name of the <CODE>byte</CODE>
- * @param value
- * the <CODE>byte</CODE> value to set in the Map
+ * @param name the name of the <CODE>byte</CODE>
+ * @param value the <CODE>byte</CODE> value to set in the Map
*/
- public void setByteValue(String name, byte value) {
+ public void setByteValue(String name,byte value){
initializeWriting();
put(name, Byte.valueOf(value));
}
-
+
/**
* Sets a <CODE>short</CODE> value with the specified name into the Map.
*
- * @param name
- * the name of the <CODE>short</CODE>
- * @param value
- * the <CODE>short</CODE> value to set in the Map
+ * @param name the name of the <CODE>short</CODE>
+ * @param value the <CODE>short</CODE> value to set in the Map
*/
- public void setShortValue(String name, short value) {
+ public void setShortValue(String name,short value){
initializeWriting();
put(name, Short.valueOf(value));
}
-
+
/**
* Sets a Unicode character value with the specified name into the Map.
*
- * @param name
- * the name of the Unicode character
- * @param value
- * the Unicode character value to set in the Map
+ * @param name the name of the Unicode character
+ * @param value the Unicode character value to set in the Map
*/
- public void setCharValue(String name, char value) {
+ public void setCharValue(String name,char value){
initializeWriting();
put(name, Character.valueOf(value));
}
-
+
/**
* Sets an <CODE>int</CODE> value with the specified name into the Map.
*
- * @param name
- * the name of the <CODE>int</CODE>
- * @param value
- * the <CODE>int</CODE> value to set in the Map
+ * @param name the name of the <CODE>int</CODE>
+ * @param value the <CODE>int</CODE> value to set in the Map
*/
- public void setIntValue(String name, int value) {
+ public void setIntValue(String name,int value){
initializeWriting();
put(name, Integer.valueOf(value));
}
-
+
/**
* Sets a <CODE>long</CODE> value with the specified name into the Map.
*
- * @param name
- * the name of the <CODE>long</CODE>
- * @param value
- * the <CODE>long</CODE> value to set in the Map
+ * @param name the name of the <CODE>long</CODE>
+ * @param value the <CODE>long</CODE> value to set in the Map
*/
- public void setLongValue(String name, long value) {
+ public void setLongValue(String name,long value){
initializeWriting();
put(name, Long.valueOf(value));
}
-
+
/**
* Sets a <CODE>float</CODE> value with the specified name into the Map.
*
- * @param name
- * the name of the <CODE>float</CODE>
- * @param value
- * the <CODE>float</CODE> value to set in the Map
+ * @param name the name of the <CODE>float</CODE>
+ * @param value the <CODE>float</CODE> value to set in the Map
*/
- public void setFloatValue(String name, float value) {
+ public void setFloatValue(String name,float value){
initializeWriting();
put(name, new Float(value));
}
-
+
/**
* Sets a <CODE>double</CODE> value with the specified name into the Map.
*
- * @param name
- * the name of the <CODE>double</CODE>
- * @param value
- * the <CODE>double</CODE> value to set in the Map
+ * @param name the name of the <CODE>double</CODE>
+ * @param value the <CODE>double</CODE> value to set in the Map
*/
- public void setDoubleValue(String name, double value) {
+ public void setDoubleValue(String name,double value){
initializeWriting();
put(name, new Double(value));
}
-
+
/**
* Sets a <CODE>String</CODE> value with the specified name into the Map.
*
- * @param name
- * the name of the <CODE>String</CODE>
- * @param value
- * the <CODE>String</CODE> value to set in the Map
+ * @param name the name of the <CODE>String</CODE>
+ * @param value the <CODE>String</CODE> value to set in the Map
*/
- public void setStringValue(String name, String value) {
+ public void setStringValue(String name,String value){
initializeWriting();
put(name, value);
}
-
+
/**
* Sets a byte array value with the specified name into the Map.
*
- * @param name
- * the name of the byte array
- * @param value
- * the byte array value to set in the Map; the array is copied so that the value for <CODE>name </CODE>
+ * @param name the name of the byte array
+ * @param value the byte array value to set in the Map; the array is copied so that the value for <CODE>name </CODE>
* will not be altered by future modifications
- * @throws NullPointerException
- * if the name is null, or if the name is an empty string.
+ * @throws NullPointerException if the name is null, or if the name is an empty string.
*/
- public void setBytesValue(String name, byte[] value) {
+ public void setBytesValue(String name,byte[] value){
initializeWriting();
if (value != null) {
put(name, value);
@@ -879,18 +828,15 @@
this.map.remove(name);
}
}
-
+
/**
* Sets a Buffer value with the specified name into the Map.
*
- * @param name
- * the name of the byte array
- * @param value
- * the Buffer value to set in the Map
- * @throws NullPointerException
- * if the name is null, or if the name is an empty string.
+ * @param name the name of the byte array
+ * @param value the Buffer value to set in the Map
+ * @throws NullPointerException if the name is null, or if the name is an empty string.
*/
- public void setBufferValue(String name, Buffer value) {
+ public void setBufferValue(String name,Buffer value){
initializeWriting();
if (value != null) {
put(name, value);
@@ -898,26 +844,22 @@
this.map.remove(name);
}
}
-
+
/**
* Sets a portion of the byte array value with the specified name into the Map.
*
- * @param name
- * the name of the byte array
- * @param value
- * the byte array value to set in the Map
- * @param offset
- * the initial offset within the byte array
- * @param length
- * the number of bytes to use
+ * @param name the name of the byte array
+ * @param value the byte array value to set in the Map
+ * @param offset the initial offset within the byte array
+ * @param length the number of bytes to use
*/
- public void setBytesValue(String name, byte[] value, int offset, int length) {
+ public void setBytesValue(String name,byte[] value,int offset,int length){
initializeWriting();
byte[] data = new byte[length];
System.arraycopy(value, offset, data, 0, length);
put(name, data);
}
-
+
/**
* Find out if the message contains a key This isn't recursive
*
@@ -925,11 +867,11 @@
* @return true if the message contains the key
*
*/
- public boolean containsKey(Object key) {
+ public boolean containsKey(Object key){
initializeReading();
return this.map.containsKey(key.toString());
}
-
+
/**
* Find out if the message contains a value
*
@@ -937,61 +879,60 @@
* @return true if the value exists
*
*/
- public boolean containsValue(Object value) {
+ public boolean containsValue(Object value){
initializeReading();
return this.map.containsValue(value);
}
-
+
/**
* @return a set of Map.Entry values
*
*/
- public Set<java.util.Map.Entry<String, Object>> entrySet() {
+ public Set<java.util.Map.Entry<String, Object>> entrySet(){
initializeReading();
return this.map.entrySet();
}
-
+
/**
* Retrieve the object associated with the key
*
* @param key
* @return the object
*/
- public Object get(Object key) {
+ public Object get(Object key){
initializeReading();
return getObjectValue(key.toString());
}
-
+
/**
* @return true if the message is empty
*
*/
- public boolean isEmpty() {
+ public boolean isEmpty(){
initializeReading();
return this.map.isEmpty();
}
-
+
/**
* @return a Set of all the keys
*/
- public Set<String> keySet() {
+ public Set<String> keySet(){
initializeReading();
return this.map.keySet();
}
-
+
/**
* Add all entries in a Map to the message
*
- * @param t
- * the map
+ * @param t the map
*
*/
- public void putAll(Map<? extends String, ? extends Object> t) {
+ public void putAll(Map<? extends String, ? extends Object> t){
for (Map.Entry<? extends String, ? extends Object> entry : t.entrySet()) {
put(entry.getKey(), entry.getValue());
}
}
-
+
/**
* Remove a key/value pair from the message
*
@@ -999,45 +940,46 @@
* @return the value removed or null
*
*/
- public Object remove(Object key) {
+ public Object remove(Object key){
setContent(null);
return this.map.remove(key.toString());
}
-
+
/**
* @return the number of entries in the message
*/
- public int size() {
+ public int size(){
initializeReading();
return this.map.size();
}
-
+
/**
* @return a Collection of the values in the message
*/
- public Collection<Object> values() {
+ public Collection<Object> values(){
initializeReading();
return this.map.values();
}
/**
* check if a named value exists in the message
+ *
* @param name
* @return true if value exits
*/
- public boolean valueExists(String name) {
+ public boolean valueExists(String name){
return this.map.containsKey(name);
}
-
- protected void initializeReading() {
+
+ protected void initializeReading(){
loadContent();
}
-
- protected void initializeWriting() {
+
+ protected void initializeWriting(){
setContent(null);
}
-
- protected void checkValidObject(Object value) throws IllegalArgumentException {
+
+ protected void checkValidObject(Object value) throws IllegalArgumentException{
boolean valid = value instanceof Boolean || value instanceof Byte || value instanceof Short
|| value instanceof Integer || value instanceof Long;
valid = valid || value instanceof Float || value instanceof Double || value instanceof Character
@@ -1053,37 +995,37 @@
throw new IllegalArgumentException("Not a valid message value: " + value);
}
}
-
- /**
+
+ /**
* @return pretty print
* @see java.lang.Object#toString()
*/
- public String toString() {
+ public String toString(){
return super.toString() + "MQBlazeMessage{ " + "map = " + this.map + " }";
}
-
- protected void copy(BlazeMessage copy) throws BlazeException {
+
+ protected void copy(BlazeMessage copy) throws BlazeException{
storeContent();
copy.content = this.content;
}
-
+
/**
* @return the content data
*/
- public BlazeData getContent() {
+ public BlazeData getContent(){
return this.content;
}
-
+
/**
* Set the content data
*
* @param content
*/
- public void setContent(BlazeData content) {
+ public void setContent(BlazeData content){
this.content = content;
}
-
- protected void marshallMap(MapDataBean mapData, String name, Object value) throws BlazeRuntimeException {
+
+ protected void marshallMap(MapDataBean mapData,String name,Object value) throws BlazeRuntimeException{
if (value != null) {
if (value.getClass() == Boolean.class) {
BoolTypeBean type = new BoolTypeBean();
@@ -1152,8 +1094,8 @@
}
}
}
-
- protected Map<String, Object> unmarshall(MapData mapData) {
+
+ protected Map<String, Object> unmarshall(MapData mapData){
Map<String, Object> result = new ConcurrentHashMap<String, Object>();
if (mapData.hasBoolType()) {
for (BoolType type : mapData.getBoolTypeList()) {
@@ -1218,11 +1160,11 @@
}
return result;
}
-
+
/**
* Store content into a BlazeData object for serialization
*/
- public void storeContent() {
+ public void storeContent(){
if (getContent() == null) {
BlazeDataBean bd = new BlazeDataBean();
MapDataBean mapData = new MapDataBean();
@@ -1230,9 +1172,6 @@
marshallMap(mapData, entry.getKey().toString(), entry.getValue());
}
bd.setMapData(mapData);
- if (this.destination != null) {
- bd.setDestinationData(this.destination.getData());
- }
if (this.replyTo != null) {
bd.setReplyToData(this.replyTo.getData());
}
@@ -1253,22 +1192,18 @@
bd.setRedeliveryCounter(this.redeliveryCounter);
bd.setPriority(this.priority);
bd.setPersistent(this.persistent);
- bd.setType(this.type);
this.content = bd;
}
}
-
+
/**
* Builds the message body from data
*
*/
- protected void loadContent() throws BlazeRuntimeException {
+ protected void loadContent() throws BlazeRuntimeException{
BlazeData data = getContent();
if (data != null && this.map.isEmpty()) {
this.map = unmarshall(data.getMapData());
- if (data.hasDestinationData()) {
- this.destination = new Destination(data.getDestinationData());
- }
if (data.hasReplyToData()) {
this.replyTo = new Destination(data.getReplyToData());
}
@@ -1281,7 +1216,7 @@
if (data.hasCorrelationId()) {
this.correlationId = data.getCorrelationId().toStringUtf8();
}
- if(data.hasMessageType()) {
+ if (data.hasMessageType()) {
this.messageType = data.getMessageType().toStringUtf8();
}
this.timeStamp = data.getTimestamp();
@@ -1289,10 +1224,7 @@
this.redeliveryCounter = data.getRedeliveryCounter();
this.priority = data.getPriority();
this.persistent = data.getPersistent();
- this.type=data.getType();
-
}
}
-
-
+
}
\ No newline at end of file
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java?rev=752825&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java Thu Mar 12 10:12:30 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+import org.apache.activeblaze.wire.PacketData;
+
+/**
+ * BlazeMessageProcessor - build a BlazeMessage from PacketData
+ *
+ */
+public interface BlazeMessageProcessor{
+
+ /**
+ * Process a PacketData of that is a BlazeMessage type
+ * @param data
+ * @return the built BlazeMessage
+ */
+ BlazeMessage processBlazeMessage(PacketData data);
+
+}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java?rev=752825&r1=752824&r2=752825&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java Thu Mar 12 10:12:30 2009
@@ -24,7 +24,6 @@
import org.apache.activeblaze.group.MemberImpl;
import org.apache.activeblaze.impl.processor.Packet;
import org.apache.activeblaze.util.SendRequest;
-import org.apache.activeblaze.wire.AckData;
import org.apache.activeblaze.wire.MessageType;
import org.apache.activeblaze.wire.PacketData;
import org.apache.activeblaze.wire.AckData.AckDataBuffer;
@@ -164,7 +163,7 @@
protected void processData(String id, Buffer correlationId, PacketDataBuffer data) throws Exception {
if (isStarted()) {
processRequest(correlationId, data);
- MessageType type = data.getType();
+ MessageType type = data.getMessageType();
switch(type) {
case BLAZE_DATA:
doProcessBlazeData(data);
@@ -244,7 +243,7 @@
this.unicast.downStream(packet);
PacketDataBuffer response = request.get(timeout);
if (response != null) {
- type = response.getType();
+ type = response.getMessageType();
result = type.parseUnframed(response.getPayload());
}
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java?rev=752825&r1=752824&r2=752825&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java Thu Mar 12 10:12:30 2009
@@ -18,9 +18,11 @@
import java.util.List;
import java.util.Set;
+
import org.apache.activeblaze.BlazeChannel;
import org.apache.activeblaze.BlazeMessage;
import org.apache.activeblaze.BlazeMessageListener;
+import org.apache.activeblaze.BlazeMessageProcessor;
import org.apache.activeblaze.Destination;
import org.apache.activeblaze.Subscription;
@@ -283,4 +285,14 @@
* @throws Exception
*/
public List<String> getGroups() throws Exception;
+
+ /**
+ * @param processor
+ */
+ public void setBlazeMessageProcessor(BlazeMessageProcessor processor);
+
+ /**
+ * @return BlazeMessageProcessor
+ */
+ public BlazeMessageProcessor getBlazeMessageProcessor();
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=752825&r1=752824&r2=752825&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Thu Mar 12 10:12:30 2009
@@ -55,25 +55,26 @@
import org.apache.activemq.protobuf.MessageBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
/**
* <P>
- * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point
- * communication
+ * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point communication
*
*/
-public class BlazeGroupChannelImpl extends BlazeChannelImpl implements BlazeGroupChannel {
+public class BlazeGroupChannelImpl extends BlazeChannelImpl implements BlazeGroupChannel{
private static final Log LOG = LogFactory.getLog(BlazeGroupChannelImpl.class);
private String name;
protected Processor unicast;
private MemberImpl local;
private BlazeMessageListener inboxListener;
- protected Map<Buffer, SendRequest<PacketDataBuffer>> messageRequests = new LRUCache<Buffer, SendRequest<PacketDataBuffer>>(10000);
+ protected Map<Buffer, SendRequest<PacketDataBuffer>> messageRequests = new LRUCache<Buffer, SendRequest<PacketDataBuffer>>(
+ 10000);
private final List<SubscriptionHolder> queueMessageListeners = new CopyOnWriteArrayList<SubscriptionHolder>();
private Group group;
protected Buffer inboxAddress;
protected int inBoxPort;
protected final Object localMutex = new Object();
-
+
/**
* Constructor
*
@@ -83,12 +84,12 @@
super();
this.name = name;
}
-
+
/**
* @throws Exception
* @see org.apache.activeblaze.Service#init()
*/
- public void doInit() throws Exception {
+ public void doInit() throws Exception{
super.doInit();
String unicastURIStr = getConfiguration().getUnicastURI();
unicastURIStr = PropertyUtil.addPropertiesToURIFromBean(unicastURIStr, getConfiguration());
@@ -106,8 +107,8 @@
this.local = createLocal(unicastURI);
this.group = createGroup();
}
-
- protected final Processor configureProcess(ChainedProcessor transport, String reliability) throws Exception {
+
+ protected final Processor configureProcess(ChainedProcessor transport,String reliability) throws Exception{
int maxPacketSize = getConfiguration().getMaxPacketSize();
CompressionProcessor result = new CompressionProcessor();
result.setPrev(this);
@@ -121,25 +122,25 @@
result.setEnd(transport);
return result;
}
-
- protected ChainedProcessor getReliability(String reliability) throws Exception {
+
+ protected ChainedProcessor getReliability(String reliability) throws Exception{
DefaultChainedProcessor reliable = ReliableFactory.get(reliability);
return reliable;
}
-
- protected MemberImpl createLocal(URI uri) throws Exception {
+
+ protected MemberImpl createLocal(URI uri) throws Exception{
return new MemberImpl(getId(), getName(), 0, 0, uri);
}
-
- protected Group createGroup() {
+
+ protected Group createGroup(){
return new Group(this);
}
-
+
/**
* @throws Exception
* @see org.apache.activeblaze.Service#shutDown()
*/
- public void doShutDown() throws Exception {
+ public void doShutDown() throws Exception{
super.doShutDown();
if (this.group != null) {
this.group.shutDown();
@@ -148,43 +149,43 @@
this.unicast.shutDown();
}
}
-
+
/**
* @throws Exception
* @see org.apache.activeblaze.Service#start()
*/
- public void doStart() throws Exception {
+ public void doStart() throws Exception{
super.doStart();
this.unicast.start();
this.group.start();
}
-
+
/**
* @throws Exception
* @see org.apache.activeblaze.Service#stop()
*/
- public void doStop() throws Exception {
+ public void doStop() throws Exception{
super.doStop();
this.group.stop();
this.unicast.stop();
}
-
+
/**
* @return the name
*/
- public String getName() {
+ public String getName(){
synchronized (this.localMutex) {
return this.name;
}
}
-
+
/**
* set the name
*
* @param name
* @see org.apache.activeblaze.group.BlazeGroupChannel#setName(java.lang.String)
*/
- public void setName(String name) {
+ public void setName(String name){
synchronized (this.localMutex) {
this.name = name;
if (this.local != null) {
@@ -192,88 +193,87 @@
}
}
}
-
+
/**
* @return the inboxListener
*/
- public BlazeMessageListener getInboxListener() {
+ public BlazeMessageListener getInboxListener(){
return this.inboxListener;
}
-
+
/**
- * @param inboxListener
- * the inboxListener to set
+ * @param inboxListener the inboxListener to set
*/
- public void setInboxListener(BlazeMessageListener inboxListener) {
+ public void setInboxListener(BlazeMessageListener inboxListener){
this.inboxListener = inboxListener;
}
-
+
/**
* @return this channel's configuration
* @see org.apache.activeblaze.group.BlazeGroupChannel#getConfiguration()
*/
- public BlazeGroupConfiguration getConfiguration() {
+ public BlazeGroupConfiguration getConfiguration(){
return (BlazeGroupConfiguration) this.configuration;
}
-
+
/**
* @return the member for this channel
* @see org.apache.activeblaze.group.BlazeGroupChannel#getLocalMember()
*/
- public MemberImpl getLocalMember() {
+ public MemberImpl getLocalMember(){
synchronized (this.localMutex) {
return this.local;
}
}
-
- protected void setLocalMember(MemberImpl local) {
+
+ protected void setLocalMember(MemberImpl local){
synchronized (this.localMutex) {
this.local = local;
}
}
-
+
/**
* @param l
* @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#addMemberChangedListener(org.apache.activeblaze.group.MemberChangedListener)
*/
- public void addMemberChangedListener(MemberChangedListener l) throws Exception {
+ public void addMemberChangedListener(MemberChangedListener l) throws Exception{
init();
this.group.addMemberChangedListener(l);
}
-
+
/**
* @param l
* @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#removeMemberChangedListener(org.apache.activeblaze.group.MemberChangedListener)
*/
- public void removeMemberChangedListener(MemberChangedListener l) throws Exception {
+ public void removeMemberChangedListener(MemberChangedListener l) throws Exception{
init();
this.group.removeMemberChangedListener(l);
}
-
+
/**
* @param id
* @return the Member
* @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#getMemberById(java.lang.String)
*/
- public Member getMemberById(String id) throws Exception {
+ public Member getMemberById(String id) throws Exception{
init();
return this.group.getMemberById(id);
}
-
+
/**
* @param name
* @return the Member
* @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#getMemberByName(java.lang.String)
*/
- public Member getMemberByName(String name) throws Exception {
+ public Member getMemberByName(String name) throws Exception{
init();
return this.group.getMemberByName(name);
}
-
+
/**
* Will wait for a member to advertise itself if not available
*
@@ -282,44 +282,42 @@
* @return the member or null
* @throws Exception
*/
- public Member getAndWaitForMemberByName(String name, int timeout) throws Exception {
+ public Member getAndWaitForMemberByName(String name,int timeout) throws Exception{
init();
return this.group.getAndWaitForMemberByName(name, timeout);
}
-
+
/**
* @return the members
* @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#getMembers()
*/
- public Set<Member> getMembers() throws Exception {
+ public Set<Member> getMembers() throws Exception{
init();
return this.group.getMembers();
}
-
+
/**
* Send a message to a member of the group - in a round-robin fashion
*
* @param destination
* @param message
* @throws Exception
- * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String,
- * org.apache.activeblaze.BlazeMessage)
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String, org.apache.activeblaze.BlazeMessage)
*/
- public void send(String destination, BlazeMessage message) throws Exception {
+ public void send(String destination,BlazeMessage message) throws Exception{
send(new Destination(destination, false), message);
}
-
+
/**
* Send a message to a member of the group - in a round-robin fashion
*
* @param destination
* @param message
* @throws Exception
- * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String,
- * org.apache.activeblaze.BlazeMessage)
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String, org.apache.activeblaze.BlazeMessage)
*/
- public void send(Destination destination, BlazeMessage message) throws Exception {
+ public void send(Destination destination,BlazeMessage message) throws Exception{
while (true) {
MemberImpl member = getQueueDestination(destination.getName());
if (member != null) {
@@ -334,7 +332,7 @@
}
}
}
-
+
/**
* @param member
* @param message
@@ -342,10 +340,10 @@
* @see org.apache.activeblaze.group.BlazeGroupChannel#send(org.apache.activeblaze.group.Member,
* org.apache.activeblaze.BlazeMessage)
*/
- public void send(Member member, BlazeMessage message) throws Exception {
+ public void send(Member member,BlazeMessage message) throws Exception{
send((MemberImpl) member, new Buffer(member.getInBoxDestination()), message);
}
-
+
/**
* @param member
* @param message
@@ -354,10 +352,10 @@
* @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member,
* org.apache.activeblaze.BlazeMessage)
*/
- public BlazeMessage sendRequest(Member member, BlazeMessage message) throws Exception {
+ public BlazeMessage sendRequest(Member member,BlazeMessage message) throws Exception{
return sendRequest(member, message, 0);
}
-
+
/**
* @param member
* @param message
@@ -367,10 +365,10 @@
* @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member,
* org.apache.activeblaze.BlazeMessage, int)
*/
- public BlazeMessage sendRequest(Member member, BlazeMessage message, int timeout) throws Exception {
+ public BlazeMessage sendRequest(Member member,BlazeMessage message,int timeout) throws Exception{
return sendRequest((MemberImpl) member, new Buffer(member.getInBoxDestination()), message, timeout);
}
-
+
/**
* @param destination
* @param message
@@ -379,10 +377,10 @@
* @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
* org.apache.activeblaze.BlazeMessage)
*/
- public BlazeMessage sendRequest(String destination, BlazeMessage message) throws Exception {
+ public BlazeMessage sendRequest(String destination,BlazeMessage message) throws Exception{
return sendRequest(new Destination(destination, false), message, 0);
}
-
+
/**
* @param destination
* @param message
@@ -391,10 +389,10 @@
* @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
* org.apache.activeblaze.BlazeMessage)
*/
- public BlazeMessage sendRequest(Destination destination, BlazeMessage message) throws Exception {
+ public BlazeMessage sendRequest(Destination destination,BlazeMessage message) throws Exception{
return sendRequest(destination, message, 0);
}
-
+
/**
* @param destination
* @param message
@@ -404,10 +402,10 @@
* @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
* org.apache.activeblaze.BlazeMessage, int)
*/
- public BlazeMessage sendRequest(String destination, BlazeMessage message, int timeout) throws Exception {
+ public BlazeMessage sendRequest(String destination,BlazeMessage message,int timeout) throws Exception{
return sendRequest(new Destination(destination, false), message, timeout);
}
-
+
/**
* @param destination
* @param message
@@ -417,7 +415,7 @@
* @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
* org.apache.activeblaze.BlazeMessage, int)
*/
- public BlazeMessage sendRequest(Destination destination, BlazeMessage message, int timeout) throws Exception {
+ public BlazeMessage sendRequest(Destination destination,BlazeMessage message,int timeout) throws Exception{
Buffer key = destination.getName();
long deadline = 0;
long waitTime = timeout;
@@ -444,7 +442,7 @@
}
return null;
}
-
+
/**
* send Request
*
@@ -455,15 +453,18 @@
* @return the response
* @throws Exception
*/
- public BlazeMessage sendRequest(MemberImpl member, Buffer destinationName, BlazeMessage message, int timeout)
- throws Exception {
+ public BlazeMessage sendRequest(MemberImpl member,Buffer destinationName,BlazeMessage message,int timeout)
+ throws Exception{
BlazeMessage result = null;
if (member != null) {
SendRequest<PacketDataBuffer> request = new SendRequest<PacketDataBuffer>();
- message.setDestination(new Destination(destinationName, false));
+ Destination dest = new Destination(destinationName, false);
+ message.setDestination(dest);
message.storeContent();
BlazeDataBuffer blazeData = message.getContent().freeze();
PacketDataBean packetData = getPacketData(MessageType.BLAZE_DATA, blazeData);
+ packetData.setDestinationData(dest.getData());
+ packetData.setPayloadType(message.getType());
synchronized (this.messageRequests) {
this.messageRequests.put(packetData.getMessageId(), request);
}
@@ -475,7 +476,7 @@
}
return result;
}
-
+
/**
* @param to
* @param response
@@ -484,18 +485,23 @@
* @see org.apache.activeblaze.group.BlazeGroupChannel#sendReply(org.apache.activeblaze.group.Member,
* org.apache.activeblaze.BlazeMessage, java.lang.String)
*/
- public void sendReply(Member to, BlazeMessage response, String correlationId) throws Exception {
+ public void sendReply(Member to,BlazeMessage response,String correlationId) throws Exception{
response.storeContent();
+ Destination dest = response.getDestination();
BlazeDataBuffer blazeData = response.getContent().freeze();
PacketDataBean data = getPacketData(MessageType.BLAZE_DATA, blazeData);
data.setCorrelationId(new Buffer(correlationId));
+ if (dest != null) {
+ data.setDestinationData(dest.getData());
+ }
+ data.setPayloadType(response.getType());
data.setReliable(true);
Packet packet = new Packet(data.freeze());
packet.setTo(((MemberImpl) to).getAddress());
this.unicast.downStream(packet);
}
-
- protected void send(MemberImpl member, Buffer destinationName, BlazeMessage message) throws Exception {
+
+ protected void send(MemberImpl member,Buffer destinationName,BlazeMessage message) throws Exception{
Destination dest = new Destination(destinationName, false);
message.setDestination(dest);
message.storeContent();
@@ -503,11 +509,13 @@
PacketDataBean data = getPacketData(MessageType.BLAZE_DATA, message.getContent().freeze());
data.setReliable(true);
data.setResponseRequired(true);
+ data.setDestinationData(dest.getData());
+ data.setPayloadType(message.getType());
Packet packet = new Packet(data.freeze());
packet.setTo(member.getAddress());
this.unicast.downStream(packet);
}
-
+
/**
* @param destination
* @param l
@@ -515,63 +523,62 @@
* @see org.apache.activeblaze.group.BlazeGroupChannel#addBlazeQueueMessageListener(java.lang.String,
* org.apache.activeblaze.group.BlazeMessageListener)
*/
- public void addBlazeQueueMessageListener(String destination, BlazeMessageListener l) throws Exception {
+ public void addBlazeQueueMessageListener(String destination,BlazeMessageListener l) throws Exception{
init();
- SubscriptionHolder key = new SubscriptionHolder(destination, false,l);
+ SubscriptionHolder key = new SubscriptionHolder(destination, false, l);
+
+ this.queueMessageListeners.add(key);
- this.queueMessageListeners.add(key);
-
buildLocal();
}
-
+
/**
- * @param subscription
+ * @param subscription
* @param l
* @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#addBlazeQueueMessageListener(java.lang.String,
* org.apache.activeblaze.group.BlazeMessageListener)
*/
- public void addBlazeQueueMessageListener(Subscription subscription, BlazeMessageListener l) throws Exception {
+ public void addBlazeQueueMessageListener(Subscription subscription,BlazeMessageListener l) throws Exception{
init();
SubscriptionHolder key = new SubscriptionHolder(subscription, l);
- this.queueMessageListeners.add(key);
-
+ this.queueMessageListeners.add(key);
+
buildLocal();
}
-
+
/**
* @param destination
* @param l
* @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeQueueMessageListener(java.lang.String)
*/
- public void removeBlazeQueueMessageListener(String destination,BlazeMessageListener l) throws Exception {
+ public void removeBlazeQueueMessageListener(String destination,BlazeMessageListener l) throws Exception{
init();
- SubscriptionHolder key = new SubscriptionHolder(destination, false,l);
+ SubscriptionHolder key = new SubscriptionHolder(destination, false, l);
+
+ this.queueMessageListeners.remove(key);
-
- this.queueMessageListeners.remove(key);
-
buildLocal();
-
+
}
-
+
/**
* @param subscription
* @param l
* @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeQueueMessageListener(java.lang.String)
*/
- public void removeBlazeQueueMessageListener(Subscription subscription,BlazeMessageListener l) throws Exception {
+ public void removeBlazeQueueMessageListener(Subscription subscription,BlazeMessageListener l) throws Exception{
init();
-
+
SubscriptionHolder key = new SubscriptionHolder(subscription, l);
- this.queueMessageListeners.remove(key);
-
+ this.queueMessageListeners.remove(key);
+
buildLocal();
-
+
}
-
+
/**
* @param destination
* @param l
@@ -579,59 +586,59 @@
* @see org.apache.activeblaze.BlazeChannel#addBlazeTopicMessageListener(java.lang.String,
* org.apache.activeblaze.BlazeMessageListener)
*/
- public void addBlazeTopicMessageListener(String destination, BlazeMessageListener l) throws Exception {
+ public void addBlazeTopicMessageListener(String destination,BlazeMessageListener l) throws Exception{
init();
super.addBlazeTopicMessageListener(destination, l);
buildLocal();
}
-
+
/**
* @param destination
* @return the removed <Code>BlazeTopicListener</Code>
* @throws Exception
* @see org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(java.lang.String)
*/
- public void removeBlazeTopicMessageListener(String destination,BlazeMessageListener l) throws Exception {
+ public void removeBlazeTopicMessageListener(String destination,BlazeMessageListener l) throws Exception{
init();
- super.removeBlazeTopicMessageListener(destination,l);
+ super.removeBlazeTopicMessageListener(destination, l);
buildLocal();
-
+
}
-
+
/**
* @param groupName
* @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#addToGroup(java.lang.String)
*/
- public void addToGroup(String groupName) throws Exception {
+ public void addToGroup(String groupName) throws Exception{
init();
this.local.addToGroup(groupName);
}
-
+
/**
* @param groupName
* @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#removeFromGroup(java.lang.String)
*/
- public void removeFromGroup(String groupName) throws Exception {
+ public void removeFromGroup(String groupName) throws Exception{
init();
this.local.removeFromGroup(groupName);
}
-
+
/**
* @return the groups
* @throws Exception
* @see org.apache.activeblaze.group.BlazeGroupChannel#getGroups()
*/
- public List<String> getGroups() throws Exception {
+ public List<String> getGroups() throws Exception{
init();
return this.local.getGroups();
}
-
- protected void processData(String id, Buffer correlationId, PacketDataBuffer data) throws Exception {
+
+ protected void processData(String id,Buffer correlationId,PacketDataBuffer data) throws Exception{
if (isStarted()) {
if (!processRequest(correlationId, data)) {
- MessageType type = data.getType();
+ MessageType type = data.getMessageType();
if (type == MessageType.BLAZE_DATA) {
doProcessBlazeData(data);
} else if (type == MessageType.MEMBER_DATA) {
@@ -640,8 +647,8 @@
}
}
}
-
- protected boolean processRequest(Buffer correlationId, PacketDataBuffer value) {
+
+ protected boolean processRequest(Buffer correlationId,PacketDataBuffer value){
boolean result = false;
if (correlationId != null) {
SendRequest<PacketDataBuffer> request = null;
@@ -655,61 +662,61 @@
}
return result;
}
-
- protected void doProcessBlazeData(PacketData data) throws Exception {
- BlazeMessage message = buildBlazeMessage(data);
- if (message.getContent().getDestinationData().getTopic()) {
- dispatch(message);
+
+ protected void doProcessBlazeData(PacketData data) throws Exception{
+
+ if (data.hasDestinationData()&&data.getDestinationData().getTopic()) {
+ dispatch(data);
} else {
- Buffer destinationName = message.getContent().getDestinationData().getName();
+
+ Buffer destinationName = data.getDestinationData().getName();
+ BlazeMessage message = buildBlazeMessage(data);
if (this.inboxListener != null && this.producerId.equals(destinationName)) {
this.inboxListener.onMessage(message);
} else {
- int index=0;
- for (SubscriptionHolder entry : this.queueMessageListeners) {
- if (entry.getSubscription().matches(destinationName)) {
- entry.getListener().onMessage(message);
- this.queueMessageListeners.remove(index);
- this.queueMessageListeners.add(entry);
- }
- index++;
+ int index = 0;
+ for (SubscriptionHolder entry : this.queueMessageListeners) {
+ if (entry.getSubscription().matches(destinationName)) {
+ entry.getListener().onMessage(message);
+ this.queueMessageListeners.remove(index);
+ this.queueMessageListeners.add(entry);
}
-
-
+ index++;
+ }
}
}
}
-
- protected Group getGroup() {
+
+ protected Group getGroup(){
return this.group;
}
-
- protected BlazeMessage createMessage(String fromId) {
+
+ protected BlazeMessage createMessage(String fromId){
Member member = this.group.getMemberById(fromId);
BlazeMessage message = new BlazeGroupMessage(member);
return message;
}
-
- protected final void doProcessMemberData(PacketData data) throws Exception {
+
+ protected final void doProcessMemberData(PacketData data) throws Exception{
Buffer payload = data.getPayload();
MemberDataBuffer memberData = MemberDataBuffer.parseUnframed(payload);
this.group.processMember(memberData);
}
-
+
/**
* @param messageType
* @param message
* @throws Exception
*/
- public void broadcastMessage(MessageType messageType, MessageBuffer message) throws Exception {
+ public void broadcastMessage(MessageType messageType,MessageBuffer message) throws Exception{
PacketDataBean data = getPacketData(messageType, message);
data.setReliable(false);
Packet packet = new Packet(data.freeze());
this.broadcast.downStreamManagement(packet);
}
-
+
/**
* send a message
*
@@ -719,7 +726,8 @@
* @param message
* @throws Exception
*/
- public void sendMessage(AsyncGroupRequest asyncRequest, MemberImpl member, MessageType messageType, MessageBuffer message) throws Exception {
+ public void sendMessage(AsyncGroupRequest asyncRequest,MemberImpl member,MessageType messageType,
+ MessageBuffer message) throws Exception{
SendRequest<PacketDataBuffer> request = new SendRequest<PacketDataBuffer>();
PacketDataBean data = getPacketData(messageType, message);
asyncRequest.add(data.getMessageId(), request);
@@ -731,7 +739,7 @@
packet.setTo(member.getAddress());
this.unicast.downStream(packet);
}
-
+
/**
* broadcast a general message
*
@@ -740,28 +748,28 @@
* @param correlationId
* @throws Exception
*/
- public void broadcastMessage(MessageType messageType, MessageBuffer message, String correlationId) throws Exception {
+ public void broadcastMessage(MessageType messageType,MessageBuffer message,String correlationId) throws Exception{
PacketDataBean data = getPacketData(messageType, message);
data.setCorrelationId(new Buffer(correlationId));
data.setReliable(true);
Packet packet = new Packet(data.freeze());
this.broadcast.downStreamManagement(packet);
}
-
+
/**
* @param to
* @param messageType
* @param message
* @throws Exception
*/
- public void sendMessage(InetSocketAddress to, MessageType messageType, MessageBuffer message) throws Exception {
+ public void sendMessage(InetSocketAddress to,MessageType messageType,MessageBuffer message) throws Exception{
PacketDataBean data = getPacketData(messageType, message);
data.setReliable(false);
Packet packet = new Packet(data.freeze());
packet.setTo(to);
this.unicast.downStream(packet);
}
-
+
/**
* @param to
* @param messageType
@@ -769,8 +777,8 @@
* @param correlationId
* @throws Exception
*/
- public void sendReply(MemberImpl to, MessageType messageType, MessageBuffer message, String correlationId)
- throws Exception {
+ public void sendReply(MemberImpl to,MessageType messageType,MessageBuffer message,String correlationId)
+ throws Exception{
PacketDataBean data = getPacketData(messageType, message);
data.setCorrelationId(new Buffer(correlationId));
data.setReliable(false);
@@ -778,8 +786,8 @@
packet.setTo(to.getAddress());
this.unicast.downStream(packet);
}
-
- protected MemberImpl getQueueDestination(Buffer destination) {
+
+ protected MemberImpl getQueueDestination(Buffer destination){
// choose a member
MemberImpl result = null;
Map<Subscription, List<MemberImpl>> map = this.group.getQueueMap();
@@ -801,8 +809,8 @@
}
return result;
}
-
- protected void buildLocal() {
+
+ protected void buildLocal(){
if (isInitialized()) {
try {
synchronized (this.localMutex) {
@@ -818,7 +826,7 @@
for (SubscriptionHolder s : this.queueMessageListeners) {
bean.addSubscriptionData(s.getSubscription().getData());
}
-
+
MemberImpl result = new MemberImpl(bean.freeze());
this.group.processMemberUpdate(this.local, result);
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java?rev=752825&r1=752824&r2=752825&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java Thu Mar 12 10:12:30 2009
@@ -59,7 +59,7 @@
void processInBound(Packet packet) throws Exception {
PacketData packetData = packet.getPacketData();
- MessageType type = packetData.getType();
+ MessageType type = packetData.getMessageType();
if (type == MessageType.CONTROL_DATA) {
if (this.replayBuffer.isEmpty()) {
// send back a control message
@@ -119,7 +119,7 @@
PacketDataBean pd = new PacketDataBean();
pd.setResponseRequired(false);
pd.setPayload(nack.freeze().toUnframedBuffer());
- pd.setType(MessageType.NACK_DATA);
+ pd.setMessageType(MessageType.NACK_DATA);
Packet nackPacket = new Packet(pd.freeze());
nackPacket.setTo(this.peerAddress);
this.swp.sendDownStream(nackPacket);
@@ -160,7 +160,7 @@
PacketDataBean pd = new PacketDataBean();
pd.setResponseRequired(false);
pd.setPayload(ack.freeze().toUnframedBuffer());
- pd.setType(MessageType.ACK_DATA);
+ pd.setMessageType(MessageType.ACK_DATA);
ackPacket = new Packet(pd.freeze());
ackPacket.setTo(this.peerAddress);
this.lastAckTime = System.currentTimeMillis();
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java?rev=752825&r1=752824&r2=752825&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java Thu Mar 12 10:12:30 2009
@@ -96,7 +96,7 @@
Packet result = null;
PacketData data = packet.getPacketData();
if (data != null) {
- MessageType type = data.getType();
+ MessageType type = data.getMessageType();
if (type == MessageType.ACK_DATA) {
AckDataBuffer ackData = AckDataBuffer.parseUnframed(data.getPayload());
long start = ackData.getStartSequence();
@@ -158,7 +158,7 @@
PacketDataBean pd = new PacketDataBean();
pd.setResponseRequired(false);
pd.setPayload(control.freeze().toUnframedBuffer());
- pd.setType(MessageType.CONTROL_DATA);
+ pd.setMessageType(MessageType.CONTROL_DATA);
ackPacket = new Packet(pd.freeze());
ackPacket.setTo(this.peerAddress);
LOG.debug(this + " Sent Control message " + control);
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java?rev=752825&r1=752824&r2=752825&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java Thu Mar 12 10:12:30 2009
@@ -169,7 +169,7 @@
pd.setCorrelationId(data.getMessageId());
pd.setResponse(true);
pd.setPayload(ackData.freeze().toUnframedBuffer());
- pd.setType(MessageType.ACK_DATA);
+ pd.setMessageType(MessageType.ACK_DATA);
Packet packet = new Packet(pd.freeze());
return packet;
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java?rev=752825&r1=752824&r2=752825&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java Thu Mar 12 10:12:30 2009
@@ -18,6 +18,7 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
@@ -33,16 +34,20 @@
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
+
import org.apache.activeblaze.BlazeMessageListener;
+import org.apache.activeblaze.BlazeMessageProcessor;
import org.apache.activeblaze.Subscription;
import org.apache.activeblaze.group.BlazeGroupChannel;
+import org.apache.activeblaze.jms.message.BlazeJmsMessage;
import org.apache.activeblaze.util.IdGenerator;
+import org.apache.activeblaze.wire.PacketData;
/**
* Implementation of a JMS Connection
*
*/
public class BlazeJmsConnection implements Connection, TopicConnection, QueueConnection,
- org.apache.activeblaze.ExceptionListener {
+ org.apache.activeblaze.ExceptionListener,BlazeMessageProcessor{
protected final BlazeGroupChannel channel;
protected final IdGenerator tempDestinationGenerator = new IdGenerator("");
private String clientId;
@@ -55,7 +60,8 @@
protected BlazeJmsConnection(BlazeGroupChannel channel) {
this.channel = channel;
this.channel.setExceptionListener(this);
- this.clientId = channel.getName();
+ this.clientId = channel.getName();
+ this.channel.setBlazeMessageProcessor(this);
}
/**
@@ -354,4 +360,30 @@
public void setConsumerMaxDispatchQueueDepth(int consumerMaxDispatchQueueDepth) {
this.consumerMaxDispatchQueueDepth = consumerMaxDispatchQueueDepth;
}
+
+ /**
+ * @param data
+ * @return a BlazeMessage
+ *
+ */
+ public BlazeJmsMessage processBlazeMessage(PacketData data){
+ BlazeJmsMessage result = null;
+ /*
+ int type = message.getType();
+ if (type == BlazeJmsMessage.JmsMessageType.BYTES.ordinal()) {
+ result = new BlazeJmsBytesMessage();
+ } else if (type == BlazeJmsMessage.JmsMessageType.MAP.ordinal()) {
+ result = new BlazeJmsMapMessage();
+ } else if (type == BlazeJmsMessage.JmsMessageType.OBJECT.ordinal()) {
+ result = new BlazeJmsObjectMessage();
+ } else if (type == BlazeJmsMessage.JmsMessageType.STREAM.ordinal()) {
+ result = new BlazeJmsStreamMessage();
+ } else if (type == BlazeJmsMessage.JmsMessageType.TEXT.ordinal()) {
+ result = new BlazeJmsTextMessage();
+ } else {
+ result = new BlazeJmsMessage();
+ }
+ */
+ return result;
+ }
}
|