activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r739885 [1/5] - in /activemq/activemq-blaze/trunk/src: main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/group/ main/java/org/apache/activeblaze/impl/transport/ main/java/org/apache/activeblaze/jms/ main/java/org/apache/act...
Date Sun, 01 Feb 2009 23:35:56 GMT
Author: rajdavies
Date: Sun Feb  1 23:35:54 2009
New Revision: 739885

URL: http://svn.apache.org/viewvc?rev=739885&view=rev
Log:
Added framework for JMS support

Added:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageListener.java
      - copied, changed from r738496, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeTopicListener.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionClosedException.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionMetaData.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsExceptionSupport.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageProducer.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueue.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSender.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSession.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempDestination.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempQueue.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempTopic.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopic.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicPublisher.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicSession.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicSubscriber.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/MarshallingSupport.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIReferenceFactory.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java   (with props)
Removed:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeTopicListener.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeQueueListener.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Subscription.java
Modified:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Destination.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/Callback.java
    activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeMessageTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java?rev=739885&r1=739884&r2=739885&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java Sun Feb  1 23:35:54 2009
@@ -44,9 +44,7 @@
     }
 
     public final boolean shutDown() throws Exception {
-        if (isStarted()) {
-            stop();
-        }
+        stop();
         boolean result = this.initialialized.compareAndSet(true, false);
         this.initialialized.set(false);
         if (result) {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java?rev=739885&r1=739884&r2=739885&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java Sun Feb  1 23:35:54 2009
@@ -46,15 +46,31 @@
      * @param l
      * @throws Exception 
      */
-    public void addBlazeTopicMessageListener(String destination, BlazeTopicListener l) throws Exception;
+    public void addBlazeTopicMessageListener(String destination, BlazeMessageListener l) throws Exception;
     
+    
+    /**
+     * Add a listener for messages
+     * @param subscription
+     * @param l
+     * @throws Exception
+     */
+    public void addBlazeTopicMessageListener(Subscription subscription, BlazeMessageListener l) throws Exception;
     /**
      * Remove a listener for messages
      * @param destination
      * @return the removed listener
      * @throws Exception 
      */
-    public BlazeTopicListener removeBlazeTopicMessageListener(String destination) throws Exception;
+    public BlazeMessageListener removeBlazeTopicMessageListener(String destination) throws Exception;
+    
+    /**
+     * Remove the subscription
+     * @param suscription
+     * @return the original subscription
+     * @throws Exception
+     */
+    public BlazeMessageListener removeBlazeTopicMessageListener(Subscription suscription) throws Exception;
     
     /**
      * Set an exception listener for async exceptions that can be generated

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=739885&r1=739884&r2=739885&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java Sun Feb  1 23:35:54 2009
@@ -19,7 +19,6 @@
 import java.net.URI;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import org.apache.activeblaze.impl.destination.DestinationMatch;
 import org.apache.activeblaze.impl.network.Network;
 import org.apache.activeblaze.impl.network.NetworkFactory;
 import org.apache.activeblaze.impl.processor.ChainedProcessor;
@@ -44,7 +43,7 @@
  * 
  */
 public class BlazeChannelImpl extends DefaultChainedProcessor implements BlazeChannel, ExceptionListener {
-    protected Map<Buffer, BlazeTopicListener> topicessageListenerMap = new ConcurrentHashMap<Buffer, BlazeTopicListener>();
+    protected Map<Subscription, BlazeMessageListener> topicessageListenerMap = new ConcurrentHashMap<Subscription, BlazeMessageListener>();
     protected final IdGenerator idGenerator = new IdGenerator();
     protected Buffer producerId;
     protected Processor broadcast;
@@ -71,11 +70,23 @@
      * @param l
      * @throws Exception
      * @see org.apache.activeblaze.BlazeChannel#addBlazeTopicMessageListener(java.lang.String,
-     *      org.apache.activeblaze.BlazeTopicListener)
+     *      org.apache.activeblaze.BlazeMessageListener)
      */
-    public void addBlazeTopicMessageListener(String destination, BlazeTopicListener l) throws Exception {
-        Buffer key = new Buffer(destination);
-        this.topicessageListenerMap.put(key, l);
+    public void addBlazeTopicMessageListener(String destination, BlazeMessageListener l) throws Exception {
+        Subscription sub = new Subscription(destination);
+        this.topicessageListenerMap.put(sub, l);
+    }
+    
+    /**
+     * @param subscription
+     * @param l
+     * @throws Exception
+     * @see org.apache.activeblaze.BlazeChannel#addBlazeTopicMessageListener(java.lang.String,
+     *      org.apache.activeblaze.BlazeMessageListener)
+     */
+    public void addBlazeTopicMessageListener(Subscription subscription, BlazeMessageListener l) throws Exception {
+       
+        this.topicessageListenerMap.put(subscription, l);
     }
 
     /**
@@ -85,8 +96,20 @@
      * @throws Exception
      * @see org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(String destination)
      */
-    public BlazeTopicListener removeBlazeTopicMessageListener(String destination) throws Exception {
-        Buffer key = new Buffer(destination);
+    public BlazeMessageListener removeBlazeTopicMessageListener(String destination) throws Exception {
+        Subscription key = new Subscription(destination);
+        return this.topicessageListenerMap.remove(key);
+    }
+    
+    /**
+     * @param key
+     * 
+     * @return the TopicListener
+     * @throws Exception
+     * @see org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(String destination)
+     */
+    public BlazeMessageListener removeBlazeTopicMessageListener(Subscription key) throws Exception {
+      
         return this.topicessageListenerMap.remove(key);
     }
 
@@ -228,8 +251,8 @@
     protected final void dispatch(BlazeMessage message) {
         if (message != null) {
             Buffer destination = message.getContent().getDestinationData().getName();
-            for (Map.Entry<Buffer, BlazeTopicListener> entry : this.topicessageListenerMap.entrySet()) {
-                if (DestinationMatch.isMatch(destination, entry.getKey())) {
+            for (Map.Entry<Subscription, BlazeMessageListener> entry : this.topicessageListenerMap.entrySet()) {
+                if (entry.getKey().matches(destination)) {
                     entry.getValue().onMessage(message);
                 }
             }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java?rev=739885&r1=739884&r2=739885&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java Sun Feb  1 23:35:54 2009
@@ -85,6 +85,7 @@
     private transient String fromId;
     private transient String messageId;
     private transient String correlationId;
+    private transient String type;
     private transient long timeStamp;
     private transient long expiration;
     private transient int redeliveryCounter;
@@ -104,7 +105,7 @@
      * @param text
      */
     public BlazeMessage(String text) {
-        setString(DEFAULT_TEXT_PAYLOAD, text);
+        setStringValue(DEFAULT_TEXT_PAYLOAD, text);
     }
 
     /**
@@ -113,7 +114,7 @@
      * @param data
      */
     public BlazeMessage(byte[] data) {
-        setBytes(DEFAULT_BYTES_PAYLOAD, data);
+        setBytesValue(DEFAULT_BYTES_PAYLOAD, data);
     }
 
     /**
@@ -131,7 +132,7 @@
      * @param text
      */
     public void setText(String text) {
-        setString(DEFAULT_TEXT_PAYLOAD, text);
+        setStringValue(DEFAULT_TEXT_PAYLOAD, text);
     }
 
     /**
@@ -141,7 +142,7 @@
      * @throws Exception
      */
     public String getText() throws Exception {
-        return getString(DEFAULT_TEXT_PAYLOAD);
+        return getStringValue(DEFAULT_TEXT_PAYLOAD);
     }
 
     /**
@@ -150,7 +151,7 @@
      * @param payload
      */
     public void setBytes(byte[] payload) {
-        setBytes(DEFAULT_BYTES_PAYLOAD, payload);
+        setBytesValue(DEFAULT_BYTES_PAYLOAD, payload);
     }
 
     /**
@@ -160,7 +161,7 @@
      * @throws Exception
      */
     public Object getObject() throws Exception {
-        Buffer buffer = getBuffer(DEFAULT_OBJECT_PAYLOAD);
+        Buffer buffer = getBufferValue(DEFAULT_OBJECT_PAYLOAD);
         return IOUtils.getObject(buffer);
     }
 
@@ -184,7 +185,7 @@
      * @throws Exception
      */
     public byte[] getBytes() throws Exception {
-        return getBytes(DEFAULT_BYTES_PAYLOAD);
+        return getBytesValue(DEFAULT_BYTES_PAYLOAD);
     }
 
     /**
@@ -201,7 +202,7 @@
     public void setDestination(Destination destination) {
         this.destination = destination;
     }
-    
+
     /**
      * @param destination
      */
@@ -285,7 +286,7 @@
     public void setReplyTo(Destination replyTo) {
         this.replyTo = replyTo;
     }
-    
+
     /**
      * @param replyTo
      *            the replyTo to set
@@ -355,6 +356,21 @@
     }
 
     /**
+     * @return the type
+     */
+    public String getType() {
+        return this.type;
+    }
+
+    /**
+     * @param type
+     *            the type to set
+     */
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    /**
      * @return a copy of this message
      */
     public BlazeMessage clone() {
@@ -383,7 +399,7 @@
      * @throws BlazeMessageFormatException
      *             if this type conversion is invalid.
      */
-    public boolean getBoolean(String name) throws BlazeMessageFormatException {
+    public boolean getBooleanValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -407,7 +423,7 @@
      * @throws BlazeMessageFormatException
      *             if this type conversion is invalid.
      */
-    public byte getByte(String name) throws BlazeMessageFormatException {
+    public byte getByteValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -431,7 +447,7 @@
      * @throws BlazeMessageFormatException
      *             if this type conversion is invalid.
      */
-    public short getShort(String name) throws BlazeMessageFormatException {
+    public short getShortValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -458,7 +474,7 @@
      * @throws BlazeMessageFormatException
      *             if this type conversion is invalid.
      */
-    public char getChar(String name) throws BlazeMessageFormatException {
+    public char getCharValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -479,7 +495,7 @@
      * @throws BlazeMessageFormatException
      *             if this type conversion is invalid.
      */
-    public int getInt(String name) throws BlazeMessageFormatException {
+    public int getIntValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -509,7 +525,7 @@
      * @throws BlazeMessageFormatException
      *             if this type conversion is invalid.
      */
-    public long getLong(String name) throws BlazeMessageFormatException {
+    public long getLongValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -542,7 +558,7 @@
      * @throws BlazeMessageFormatException
      *             if this type conversion is invalid.
      */
-    public float getFloat(String name) throws BlazeMessageFormatException {
+    public float getFloatValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -566,7 +582,7 @@
      * @throws BlazeMessageFormatException
      *             if this type conversion is invalid.
      */
-    public double getDouble(String name) throws BlazeMessageFormatException {
+    public double getDoubleValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -594,7 +610,7 @@
      * @throws BlazeMessageFormatException
      *             if this type conversion is invalid.
      */
-    public String getString(String name) throws BlazeMessageFormatException {
+    public String getStringValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -615,7 +631,7 @@
      * @throws BlazeMessageFormatException
      *             if this type conversion is invalid.
      */
-    public byte[] getBytes(String name) throws BlazeMessageFormatException {
+    public byte[] getBytesValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value instanceof byte[]) {
@@ -633,7 +649,7 @@
      * @throws BlazeMessageFormatException
      *             if this type conversion is invalid.
      */
-    public Buffer getBuffer(String name) throws BlazeMessageFormatException {
+    public Buffer getBufferValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value instanceof Buffer) {
@@ -657,7 +673,7 @@
      *         object was set as an <CODE>int</CODE>, an <CODE>Integer</CODE> is returned); if there is no item by
      *         this name, a null value is returned
      */
-    public Object getObject(String name) {
+    public Object getObjectValue(String name) {
         initializeReading();
         return this.map.get(name);
     }
@@ -667,7 +683,7 @@
      * 
      * @return an enumeration of all the names in this <CODE>BlazeMessage</CODE>
      */
-    public Enumeration<String> getMapNames() {
+    public Enumeration<String> getNames() {
         initializeReading();
         return Collections.enumeration(this.map.keySet());
     }
@@ -700,7 +716,7 @@
      * @param value
      *            the <CODE>boolean</CODE> value to set in the Map
      */
-    public void setBoolean(String name, boolean value) {
+    public void setBooleanValue(String name, boolean value) {
         initializeWriting();
         put(name, value ? Boolean.TRUE : Boolean.FALSE);
     }
@@ -713,7 +729,7 @@
      * @param value
      *            the <CODE>byte</CODE> value to set in the Map
      */
-    public void setByte(String name, byte value) {
+    public void setByteValue(String name, byte value) {
         initializeWriting();
         put(name, Byte.valueOf(value));
     }
@@ -726,7 +742,7 @@
      * @param value
      *            the <CODE>short</CODE> value to set in the Map
      */
-    public void setShort(String name, short value) {
+    public void setShortValue(String name, short value) {
         initializeWriting();
         put(name, Short.valueOf(value));
     }
@@ -739,7 +755,7 @@
      * @param value
      *            the Unicode character value to set in the Map
      */
-    public void setChar(String name, char value) {
+    public void setCharValue(String name, char value) {
         initializeWriting();
         put(name, Character.valueOf(value));
     }
@@ -752,7 +768,7 @@
      * @param value
      *            the <CODE>int</CODE> value to set in the Map
      */
-    public void setInt(String name, int value) {
+    public void setIntValue(String name, int value) {
         initializeWriting();
         put(name, Integer.valueOf(value));
     }
@@ -765,7 +781,7 @@
      * @param value
      *            the <CODE>long</CODE> value to set in the Map
      */
-    public void setLong(String name, long value) {
+    public void setLongValue(String name, long value) {
         initializeWriting();
         put(name, Long.valueOf(value));
     }
@@ -778,7 +794,7 @@
      * @param value
      *            the <CODE>float</CODE> value to set in the Map
      */
-    public void setFloat(String name, float value) {
+    public void setFloatValue(String name, float value) {
         initializeWriting();
         put(name, new Float(value));
     }
@@ -791,7 +807,7 @@
      * @param value
      *            the <CODE>double</CODE> value to set in the Map
      */
-    public void setDouble(String name, double value) {
+    public void setDoubleValue(String name, double value) {
         initializeWriting();
         put(name, new Double(value));
     }
@@ -804,7 +820,7 @@
      * @param value
      *            the <CODE>String</CODE> value to set in the Map
      */
-    public void setString(String name, String value) {
+    public void setStringValue(String name, String value) {
         initializeWriting();
         put(name, value);
     }
@@ -820,7 +836,7 @@
      * @throws NullPointerException
      *             if the name is null, or if the name is an empty string.
      */
-    public void setBytes(String name, byte[] value) {
+    public void setBytesValue(String name, byte[] value) {
         initializeWriting();
         if (value != null) {
             put(name, value);
@@ -839,7 +855,7 @@
      * @throws NullPointerException
      *             if the name is null, or if the name is an empty string.
      */
-    public void setBuffer(String name, Buffer value) {
+    public void setBufferValue(String name, Buffer value) {
         initializeWriting();
         if (value != null) {
             put(name, value);
@@ -860,7 +876,7 @@
      * @param length
      *            the number of bytes to use
      */
-    public void setBytes(String name, byte[] value, int offset, int length) {
+    public void setBytesValue(String name, byte[] value, int offset, int length) {
         initializeWriting();
         byte[] data = new byte[length];
         System.arraycopy(value, offset, data, 0, length);
@@ -908,7 +924,7 @@
      */
     public Object get(Object key) {
         initializeReading();
-        return getObject(key.toString());
+        return getObjectValue(key.toString());
     }
 
     /**
@@ -968,12 +984,21 @@
         initializeReading();
         return this.map.values();
     }
+    
+    /**
+     * check if a named value exists in the message
+     * @param name
+     * @return true if value exits
+     */
+    public boolean valueExists(String name) {
+        return this.map.containsKey(name);
+    }
 
-    private void initializeReading() {
+    protected void initializeReading() {
         loadContent();
     }
 
-    private void initializeWriting() {
+    protected void initializeWriting() {
         setContent(null);
     }
 
@@ -983,7 +1008,7 @@
         valid = valid || value instanceof Float || value instanceof Double || value instanceof Character
                 || value instanceof String || value == null || value instanceof byte[];
         if (value instanceof Map) {
-            Map map = (Map) value;
+            Map<?, ?> map = (Map<?, ?>) value;
             for (Object v : map.values()) {
                 checkValidObject(v);
             }
@@ -1019,7 +1044,7 @@
         this.content = content;
     }
 
-    private void marshallMap(MapData mapData, String name, Object value) throws BlazeMessageFormatException {
+    protected void marshallMap(MapData mapData, String name, Object value) throws BlazeRuntimeException {
         if (value != null) {
             if (value.getClass() == Boolean.class) {
                 BoolType type = new BoolType();
@@ -1084,12 +1109,12 @@
                     mapData.addMapType(md);
                 }
             } else {
-                throw new BlazeMessageFormatException("Cannot seralize type " + value);
+                throw new BlazeRuntimeException("Cannot seralize type " + value);
             }
         }
     }
 
-    Map<String, Object> unmarshall(MapData mapData) {
+    protected Map<String, Object> unmarshall(MapData mapData) {
         Map<String, Object> result = new ConcurrentHashMap<String, Object>();
         if (mapData.hasBoolType()) {
             for (BoolType type : mapData.getBoolTypeList()) {
@@ -1157,9 +1182,8 @@
 
     /**
      * Store content into a BlazeData object for serialization
-     * @throws BlazeMessageFormatException
      */
-    public void storeContent() throws BlazeMessageFormatException {
+    public void storeContent() {
         if (getContent() == null && !this.map.isEmpty()) {
             BlazeData bd = new BlazeData();
             MapData mapData = new MapData();
@@ -1182,6 +1206,9 @@
             if (this.fromId != null) {
                 bd.setFromId(new Buffer(this.fromId));
             }
+            if (this.type != null) {
+                bd.setType(new Buffer(this.type));
+            }
             bd.setTimestamp(getTimeStamp());
             bd.setExpiration(getExpiration());
             bd.setRedeliveryCounter(getRedeliveryCounter());
@@ -1195,7 +1222,7 @@
      * Builds the message body from data
      * 
      */
-    void loadContent() throws BlazeRuntimeException {
+    protected void loadContent() throws BlazeRuntimeException {
         BlazeData data = getContent();
         if (data != null && this.map.isEmpty()) {
             this.map = unmarshall(data.getMapData());
@@ -1214,11 +1241,15 @@
             if (data.hasCorrelationId()) {
                 this.correlationId = data.getCorrelationId().toStringUtf8();
             }
+            if(data.hasType()) {
+                this.type = data.getType().toStringUtf8();
+            }
             this.timeStamp = data.getTimestamp();
             this.expiration = data.getExpiration();
             this.redeliveryCounter = data.getRedeliveryCounter();
             this.priority = data.getPriority();
             this.persistent = data.getPersistent();
+           
         }
     }
 }
\ No newline at end of file

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageListener.java (from r738496, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeTopicListener.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageListener.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageListener.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeTopicListener.java&r1=738496&r2=739885&rev=739885&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeTopicListener.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageListener.java Sun Feb  1 23:35:54 2009
@@ -20,7 +20,7 @@
  * A listener for BlazeMessages
  *
  */
-public interface BlazeTopicListener {
+public interface BlazeMessageListener {
     
     /**
      * Called when a Message is available to be processes

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Destination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Destination.java?rev=739885&r1=739884&r2=739885&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Destination.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Destination.java Sun Feb  1 23:35:54 2009
@@ -24,15 +24,22 @@
  *
  */
 public class Destination {
-    private Buffer name;
-    private boolean topic=true;
-    private boolean temporary;
-    private DestinationData data;
+    
+    private final DestinationData data;
     
     /**
      * Default Constructor
      */
     public Destination() {
+        this.data = new DestinationData();
+    }
+    
+    /**
+     * Constructor
+     * @param data
+     */
+    public Destination(DestinationData data) {
+        this.data= data;
     }
     
     /**
@@ -43,6 +50,7 @@
         this(name,true);
     }
     
+    
     /**
      * Constructor
      * @param name
@@ -59,9 +67,10 @@
      * @param temporary
      */
     public Destination(String name,boolean topic,boolean temporary) {
-        this.name=new Buffer(name);
-        this.topic=topic;
-        this.temporary=temporary;
+        this.data=new DestinationData();
+        this.data.setName(new Buffer(name));
+        this.data.setTopic(topic);
+        this.data.setTemporary(temporary);
     }
     
     /**
@@ -88,87 +97,68 @@
      * @param temporary
      */
     public Destination(Buffer name,boolean topic,boolean temporary) {
-        this.name=name;
-        this.topic=topic;
-        this.temporary=temporary;
+        this.data=new DestinationData();
+        this.data.setName(name);
+        this.data.setTopic(topic);
+        this.data.setTemporary(temporary);
     }
     
-    /**
-     * Constructor
-     * @param data
-     */
-    public Destination(DestinationData data){
-        setData(data);
-    }
+   
     /**
      * @return the name
      */
     public Buffer getName() {
-        return this.name;
+        return this.data.getName();
     }
     /**
      * @param name the name to set
      */
     public void setName(Buffer name) {
-        this.name = name;
+        this.data.setName(name);
     }
     /**
      * @return the topic
      */
     public boolean isTopic() {
-        return this.topic;
+        return this.data.getTopic();
     }
     /**
      * @param topic the topic to set
      */
     public void setTopic(boolean topic) {
-        this.topic = topic;
+        this.data.setTopic(topic);
     }
     /**
      * @return the temporary
      */
     public boolean isTemporary() {
-        return this.temporary;
+        return this.data.getTopic();
     }
     /**
      * @param temporary the temporary to set
      */
     public void setTemporary(boolean temporary) {
-        this.temporary = temporary;
+        this.data.setTemporary(temporary);
     }
     
     /**
      * @return true if a Topic
      */
     public boolean isQueue() {
-        return !this.topic;
+        return !isTopic();
     }
     /**
      * @return the data
      */
-    public synchronized DestinationData getData() {
-        if (this.data==null) {
-            this.data = new DestinationData();
-            this.data.setName(getName());
-            this.data.setTopic(isTopic());
-            this.data.setTemporary(isTemporary());
-        }
-        return this.data;
-    }
-    /**
-     * @param data the data to set
-     */
-    public synchronized void setData(DestinationData data) {
-        this.data = data;
+    public  DestinationData getData() {
         
-        this.name = data.getName();
-        this.topic = data.getTopic();
-        this.temporary=data.getTemporary();
+        return this.data;
     }
     
     
+    
     public String toString() {
-        return "Destination{"+ (isTopic()?"Topic":"Queue") + ": "  +this.name.toStringUtf8()+"}";
+        return "Destination{"+ (isTopic()?"Topic":"Queue") + ": "  +this.data.getName().toStringUtf8()+"}";
     }
    
 }

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java Sun Feb  1 23:35:54 2009
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+import org.apache.activeblaze.impl.destination.DestinationMatch;
+import org.apache.activeblaze.wire.DestinationData;
+import org.apache.activeblaze.wire.SubscriptionData;
+import org.apache.activemq.protobuf.Buffer;
+
+/**
+ * Subscription Info
+ * 
+ */
+public class Subscription {
+    private final SubscriptionData data;
+
+    /**
+     * Default Constructor
+     */
+    public Subscription() {
+        this.data = new SubscriptionData();
+    }
+
+    /**
+     * Default Constructor
+     * 
+     * @param data
+     */
+    public Subscription(SubscriptionData data) {
+        this.data = data;
+    }
+
+    /**
+     * Constructor - create a vanilla subscription
+     * 
+     * @param destination
+     */
+    public Subscription(String destination) {
+        this.data = new SubscriptionData();
+        Destination dest = new Destination(destination);
+        this.data.setDestinationData(dest.getData());
+    }
+
+    /**
+     * Constructor - create a vanilla subscription
+     * 
+     * @param destination
+     * @param topic
+     */
+    public Subscription(String destination, boolean topic) {
+        this.data = new SubscriptionData();
+        Destination dest = new Destination(destination);
+        this.data.setDestinationData(dest.getData());
+        setTopic(topic);
+    }
+
+    /**
+     * Constructor - create a vanilla subscription
+     * 
+     * @param destination
+     */
+    public Subscription(Buffer destination) {
+        this.data = new SubscriptionData();
+        Destination dest = new Destination(destination);
+        this.data.setDestinationData(dest.getData());
+    }
+
+    /**
+     * @return the underlying SubscriptionData
+     */
+    public SubscriptionData getData() {
+        return this.data;
+    }
+
+    /**
+     * set the durability of the subscription
+     * 
+     * @param value
+     */
+    public void setDurable(boolean value) {
+        this.data.setDurable(value);
+    }
+
+    /**
+     * @return true if durable
+     */
+    public boolean isDurable() {
+        return this.data.getDurable();
+    }
+
+    /**
+     * Set the weight - for Queues - higher weighted subscribers take precedence
+     * 
+     * @param value
+     */
+    public void setWeight(int value) {
+        this.data.setWeight(value);
+    }
+
+    /**
+     * @return the weight
+     */
+    public int getWeight() {
+        return this.data.getWeight();
+    }
+
+    /**
+     * Set the channel name of the subscription
+     * 
+     * @param value
+     */
+    public void setChannelName(String value) {
+        this.data.setChannelName(value);
+    }
+
+    /**
+     * @return the channel name of the subscription
+     */
+    public String getChannelName() {
+        return this.data.getChannelName();
+    }
+
+    /**
+     * Set the name of the subscriber
+     * 
+     * @param value
+     */
+    public void setSubscriberName(String value) {
+        this.data.setSubscriberName(value);
+    }
+
+    /**
+     * @return the name of the subscriber
+     */
+    public String getSubscriberName() {
+        return this.data.getSubscriberName();
+    }
+
+    /**
+     * Set a SQL92 selector
+     * 
+     * @param selector
+     */
+    public void setSelector(String selector) {
+        this.data.setSelector(selector);
+    }
+
+    /**
+     * @return the selector
+     */
+    public String getSelector() {
+        return this.data.getSelector();
+    }
+
+    /**
+     * Set the Destination
+     * 
+     * @param data
+     */
+    public void setDestination(DestinationData data) {
+        this.data.setDestinationData(data);
+    }
+
+    /**
+     * @return the Destination
+     */
+    public DestinationData getDestinationData() {
+        return this.data.getDestinationData();
+    }
+
+    /**
+     * @return the Destination
+     */
+    public Destination getDestination() {
+        return new Destination(this.data.getDestinationData());
+    }
+
+    /**
+     * @return the name
+     */
+    public Buffer getDestinationName() {
+        return this.getDestinationData().getName();
+    }
+
+    /**
+     * @param name
+     *            the name to set
+     */
+    public void setDestinationName(Buffer name) {
+        this.getDestinationData().setName(name);
+    }
+
+    /**
+     * @return the topic
+     */
+    public boolean isTopic() {
+        return this.getDestinationData().getTopic();
+    }
+
+    /**
+     * @param topic
+     *            the topic to set
+     */
+    public void setTopic(boolean topic) {
+        this.getDestinationData().setTopic(topic);
+    }
+
+    /**
+     * @return the temporary
+     */
+    public boolean isTemporary() {
+        return this.getDestinationData().getTopic();
+    }
+
+    /**
+     * @param temporary
+     *            the temporary to set
+     */
+    public void setTemporary(boolean temporary) {
+        this.getDestinationData().setTemporary(temporary);
+    }
+
+    public int hashCode() {
+        return this.data.getDestinationData().getName().hashCode();
+    }
+    
+    public boolean equals(Object obj) {
+        if( obj==this )
+           return true;
+        
+        if( obj==null || obj.getClass()!=Subscription.class )
+           return false;
+        
+        return equals((Subscription)obj);
+     }
+
+    /**
+     * @param other
+     * @return true if other is equivalent
+     */
+    public boolean equals(Subscription other) {
+        if (other != null) {
+            return this.data.getDestinationData().getName().equals(other.data.getDestinationData().getName());
+        }
+        return false;
+    }
+
+    /**
+     * Does this subscription match a destination
+     * 
+     * @param destination
+     * @return true if it matches a destination
+     */
+    public boolean matches(Destination destination) {
+        return matches(destination.getName());
+    }
+
+    /**
+     * Does this subscription match a destination
+     * 
+     * @param destination
+     * @return true if it matches a destination
+     */
+    public boolean matches(Buffer destination) {
+        return DestinationMatch.isMatch(this.data.getDestinationData().getName(), destination);
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java?rev=739885&r1=739884&r2=739885&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java Sun Feb  1 23:35:54 2009
@@ -20,6 +20,8 @@
 import java.util.Set;
 import org.apache.activeblaze.BlazeChannel;
 import org.apache.activeblaze.BlazeMessage;
+import org.apache.activeblaze.BlazeMessageListener;
+import org.apache.activeblaze.Subscription;
 
 /**
  * <P>
@@ -33,6 +35,11 @@
     public String getName();
 
     /**
+     * @param newName
+     */
+    public void setName(String newName);
+
+    /**
      * Send a message to an individual member
      * 
      * @param member
@@ -112,13 +119,13 @@
     /**
      * @return the inboxListener
      */
-    public BlazeQueueListener getInboxListener();
+    public BlazeMessageListener getInboxListener();
 
     /**
      * @param inboxListener
      *            the inboxListener to set
      */
-    public void setInboxListener(BlazeQueueListener inboxListener);
+    public void setInboxListener(BlazeMessageListener inboxListener);
 
     /**
      * @return the configuration
@@ -127,7 +134,7 @@
 
     /**
      * @return a set of the members
-     * @throws Exception 
+     * @throws Exception
      */
     public Set<Member> getMembers() throws Exception;
 
@@ -136,7 +143,7 @@
      * 
      * @param id
      * @return
-     * @throws Exception 
+     * @throws Exception
      */
     public Member getMemberById(String id) throws Exception;
 
@@ -145,17 +152,18 @@
      * 
      * @param name
      * @return
-     * @throws Exception 
+     * @throws Exception
      */
     public Member getMemberByName(String name) throws Exception;
-    
+
     /**
      * Will wait for a member to advertise itself if not available
+     * 
      * @param name
      * @param timeout
      * @return the member or null
-     * @throws InterruptedException 
-     * @throws Exception 
+     * @throws InterruptedException
+     * @throws Exception
      */
     public Member getAndWaitForMemberByName(String name, int timeout) throws InterruptedException, Exception;
 
@@ -168,7 +176,7 @@
      * Add a listener for membership changes
      * 
      * @param l
-     * @throws Exception 
+     * @throws Exception
      */
     public void addMemberChangedListener(MemberChangedListener l) throws Exception;
 
@@ -176,7 +184,7 @@
      * Remove a listener for membership changes
      * 
      * @param l
-     * @throws Exception 
+     * @throws Exception
      */
     public void removeMemberChangedListener(MemberChangedListener l) throws Exception;
 
@@ -185,37 +193,59 @@
      * 
      * @param destination
      * @param l
-     * @throws Exception 
+     * @throws Exception
+     */
+    public void addBlazeQueueMessageListener(String destination, BlazeMessageListener l) throws Exception;
+
+    /**
+     * Add a listener for messages
+     * 
+     * @param subscription
+     * @param l
+     * @throws Exception
      */
-    public void addBlazeQueueMessageListener(String destination, BlazeQueueListener l) throws Exception;
+    public void addBlazeQueueMessageListener(Subscription subscription, BlazeMessageListener l) throws Exception;
 
     /**
      * Remove a listener for messages
      * 
      * @param destination
      * @return the removed listener
-     * @throws Exception 
+     * @throws Exception
+     */
+    public BlazeMessageListener removeBlazeQueueMessageListener(String destination) throws Exception;
+
+    /**
+     * Remove a listener for messages
+     * 
+     * @param subscription
+     * 
+     * @return the removed listener
+     * @throws Exception
      */
-    public BlazeQueueListener removeBlazeQueueMessageListener(String destination) throws Exception;
-    
+    public BlazeMessageListener removeBlazeQueueMessageListener(Subscription subscription) throws Exception;
+
     /**
      * Add member to a group
+     * 
      * @param groupName
-     * @throws Exception 
+     * @throws Exception
      */
     public void addToGroup(String groupName) throws Exception;
-    
+
     /**
      * remove member from a group
+     * 
      * @param groupName
-     * @throws Exception 
+     * @throws Exception
      */
-    public void removeFromGroup(String groupName)throws Exception;
-    
+    public void removeFromGroup(String groupName) throws Exception;
+
     /**
      * Get an array of groups
+     * 
      * @return an array of groups
-     * @throws Exception 
+     * @throws Exception
      */
     public List<String> getGroups() throws Exception;
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=739885&r1=739884&r2=739885&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Sun Feb  1 23:35:54 2009
@@ -24,12 +24,12 @@
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.activeblaze.BlazeChannelImpl;
 import org.apache.activeblaze.BlazeMessage;
+import org.apache.activeblaze.BlazeMessageListener;
 import org.apache.activeblaze.BlazeNoRouteException;
 import org.apache.activeblaze.BlazeRuntimeException;
-import org.apache.activeblaze.BlazeTopicListener;
 import org.apache.activeblaze.Destination;
 import org.apache.activeblaze.Processor;
-import org.apache.activeblaze.impl.destination.DestinationMatch;
+import org.apache.activeblaze.Subscription;
 import org.apache.activeblaze.impl.processor.ChainedProcessor;
 import org.apache.activeblaze.impl.processor.CompressionProcessor;
 import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
@@ -58,12 +58,12 @@
  */
 public class BlazeGroupChannelImpl extends BlazeChannelImpl implements BlazeGroupChannel {
     private static final Log LOG = LogFactory.getLog(BlazeGroupChannelImpl.class);
-    private final String name;
+    private String name;
     protected Processor unicast;
     private MemberImpl local;
-    private BlazeQueueListener inboxListener;
+    private BlazeMessageListener inboxListener;
     protected Map<Buffer, SendRequest> messageRequests = new LRUCache<Buffer, SendRequest>(10000);
-    private Map<Buffer, BlazeQueueListener> queueMessageListenerMap = new ConcurrentHashMap<Buffer, BlazeQueueListener>();
+    private Map<Subscription, BlazeMessageListener> queueMessageListenerMap = new ConcurrentHashMap<Subscription, BlazeMessageListener>();
     private Group group;
     protected Buffer inboxAddress;
     protected int inBoxPort;
@@ -90,7 +90,7 @@
         URI unicastURI = new URI(unicastURIStr);
         BaseTransport transport = TransportFactory.get(unicastURI);
         transport.setName(getId() + "-Unicast");
-        this.unicast = configureProcess(transport,getConfiguration().getReliableUnicast());
+        this.unicast = configureProcess(transport, getConfiguration().getReliableUnicast());
         this.unicast.init();
         // if using a port of zero - the port will be assigned automatically,
         // so need to get the potentially new value
@@ -101,7 +101,7 @@
         this.local = createLocal(unicastURI);
         this.group = createGroup();
     }
-    
+
     protected final Processor configureProcess(ChainedProcessor transport, String reliability) throws Exception {
         int maxPacketSize = getConfiguration().getMaxPacketSize();
         CompressionProcessor result = new CompressionProcessor();
@@ -168,9 +168,19 @@
     }
 
     /**
+     * set the name
+     * 
+     * @param name
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#setName(java.lang.String)
+     */
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /**
      * @return the inboxListener
      */
-    public BlazeQueueListener getInboxListener() {
+    public BlazeMessageListener getInboxListener() {
         return this.inboxListener;
     }
 
@@ -178,7 +188,7 @@
      * @param inboxListener
      *            the inboxListener to set
      */
-    public void setInboxListener(BlazeQueueListener inboxListener) {
+    public void setInboxListener(BlazeMessageListener inboxListener) {
         this.inboxListener = inboxListener;
     }
 
@@ -280,12 +290,12 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String, org.apache.activeblaze.BlazeMessage)
      */
     public void send(String destination, BlazeMessage message) throws Exception {
-        Buffer key = new Buffer(destination);
+        Buffer buffer = new Buffer(destination);
         while (true) {
-            MemberImpl member = getQueueDestination(key);
+            MemberImpl member = getQueueDestination(buffer);
             if (member != null) {
                 try {
-                    send(member, key, message);
+                    send(member, buffer, message);
                     return;
                 } catch (BlazeNoRouteException e) {
                     LOG.debug("No response - resending to another client", e);
@@ -398,7 +408,7 @@
             SendRequest request = new SendRequest();
             message.storeContent();
             BlazeData blazeData = message.getContent();
-            Destination dest = new Destination(destinationName,false);
+            Destination dest = new Destination(destinationName, false);
             blazeData.setDestinationData(dest.getData());
             PacketData packetData = getPacketData(blazeData.type(), blazeData);
             synchronized (this.messageRequests) {
@@ -439,10 +449,9 @@
     }
 
     protected void send(MemberImpl member, Buffer destinationName, BlazeData blazeData) throws Exception {
-       Destination dest= new Destination(destinationName,false);
-       blazeData.clearDestinationData();
+        Destination dest = new Destination(destinationName, false);
+        blazeData.clearDestinationData();
         blazeData.setDestinationData(dest.getData());
-        
         PacketData data = getPacketData(MessageType.BLAZE_DATA, blazeData);
         data.setReliable(true);
         data.setResponseRequired(true);
@@ -456,25 +465,51 @@
      * @param l
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#addBlazeQueueMessageListener(java.lang.String,
-     *      org.apache.activeblaze.group.BlazeQueueListener)
+     *      org.apache.activeblaze.group.BlazeMessageListener)
      */
-    public void addBlazeQueueMessageListener(String destination, BlazeQueueListener l) throws Exception {
+    public void addBlazeQueueMessageListener(String destination, BlazeMessageListener l) throws Exception {
+        init();
+        Subscription key = new Subscription(destination, false);
+        this.queueMessageListenerMap.put(key, l);
+        buildLocal();
+    }
+
+    /**
+     * @param key
+     * @param l
+     * @throws Exception
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#addBlazeQueueMessageListener(java.lang.String,
+     *      org.apache.activeblaze.group.BlazeMessageListener)
+     */
+    public void addBlazeQueueMessageListener(Subscription key, BlazeMessageListener l) throws Exception {
         init();
-        Buffer key = new Buffer(destination);
         this.queueMessageListenerMap.put(key, l);
         buildLocal();
     }
 
     /**
      * @param destination
-     * @return the removed <Code>BlazeQueueListener</Code>
+     * @return the removed <Code>BlazeMessageListener</Code>
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeQueueMessageListener(java.lang.String)
      */
-    public BlazeQueueListener removeBlazeQueueMessageListener(String destination) throws Exception {
+    public BlazeMessageListener removeBlazeQueueMessageListener(String destination) throws Exception {
         init();
-        Buffer key = new Buffer(destination);
-        BlazeQueueListener result = this.queueMessageListenerMap.remove(key);
+        Subscription key = new Subscription(destination, false);
+        BlazeMessageListener result = this.queueMessageListenerMap.remove(key);
+        buildLocal();
+        return result;
+    }
+
+    /**
+     * @param key
+     * @return the removed <Code>BlazeMessageListener</Code>
+     * @throws Exception
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeQueueMessageListener(java.lang.String)
+     */
+    public BlazeMessageListener removeBlazeQueueMessageListener(Subscription key) throws Exception {
+        init();
+        BlazeMessageListener result = this.queueMessageListenerMap.remove(key);
         buildLocal();
         return result;
     }
@@ -484,9 +519,9 @@
      * @param l
      * @throws Exception
      * @see org.apache.activeblaze.BlazeChannel#addBlazeTopicMessageListener(java.lang.String,
-     *      org.apache.activeblaze.BlazeTopicListener)
+     *      org.apache.activeblaze.BlazeMessageListener)
      */
-    public void addBlazeTopicMessageListener(String destination, BlazeTopicListener l) throws Exception {
+    public void addBlazeTopicMessageListener(String destination, BlazeMessageListener l) throws Exception {
         init();
         super.addBlazeTopicMessageListener(destination, l);
         buildLocal();
@@ -498,9 +533,9 @@
      * @throws Exception
      * @see org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(java.lang.String)
      */
-    public BlazeTopicListener removeBlazeTopicMessageListener(String destination) throws Exception {
+    public BlazeMessageListener removeBlazeTopicMessageListener(String destination) throws Exception {
         init();
-        BlazeTopicListener result = super.removeBlazeTopicMessageListener(destination);
+        BlazeMessageListener result = super.removeBlazeTopicMessageListener(destination);
         buildLocal();
         return result;
     }
@@ -572,8 +607,8 @@
             if (this.inboxListener != null && this.producerId.equals(destinationName)) {
                 this.inboxListener.onMessage(message);
             } else {
-                for (Map.Entry<Buffer, BlazeQueueListener> entry : this.queueMessageListenerMap.entrySet()) {
-                    if (DestinationMatch.isMatch(destinationName, entry.getKey())) {
+                for (Map.Entry<Subscription, BlazeMessageListener> entry : this.queueMessageListenerMap.entrySet()) {
+                    if (entry.getKey().matches(destinationName)) {
                         entry.getValue().onMessage(message);
                         break;
                     }
@@ -685,13 +720,14 @@
     protected MemberImpl getQueueDestination(Buffer destination) {
         // choose a member
         MemberImpl result = null;
-        Map<Buffer, List<MemberImpl>> map = this.group.getQueueMap();
-        List<MemberImpl> list = map.get(destination);
+        Map<Subscription, List<MemberImpl>> map = this.group.getQueueMap();
+        Subscription key = new Subscription(destination);
+        List<MemberImpl> list = map.get(key);
         if (list == null) {
             // search through wildcard matches
-            for (Buffer buffer : map.keySet()) {
-                if (DestinationMatch.isMatch(destination, buffer)) {
-                    list = map.get(destination);
+            for (Subscription s : map.keySet()) {
+                if (s.matches(destination)) {
+                    list = map.get(s);
                     break;
                 }
             }
@@ -709,24 +745,19 @@
             try {
                 synchronized (this.localMutex) {
                     MemberImpl result = new MemberImpl(getLocalMember().getData().clone());
-                    result.getData().clearDestination();
+                    result.getData().clearSubscriptionData();
                     // add topic destinations
-                    for (Buffer name : this.topicessageListenerMap.keySet()) {
-                        
-                        Destination dest = new Destination(name);
-                        
-                        result.getData().addDestination(dest.getData());
+                    for (Subscription s : this.topicessageListenerMap.keySet()) {
+                        result.getData().addSubscriptionData(s.getData());
                     }
                     // add Queue Destinations
-                    for (Buffer name : this.queueMessageListenerMap.keySet()) {
-                        Destination dest = new Destination(name,false);
-                        
-                        result.getData().addDestination(dest.getData());
+                    for (Subscription s : this.queueMessageListenerMap.keySet()) {
+                        result.getData().addSubscriptionData(s.getData());
                     }
                     this.group.processMemberUpdate(this.local, result);
-                    result.getData().setDestinationsChanged(true);
+                    result.getData().setSubscriptionsChanged(true);
                     this.group.broadcastHeartBeat(result);
-                    result.getData().clearDestinationsChanged();
+                    result.getData().clearSubscriptionsChanged();
                     this.local = result;
                     this.group.updateLocal(this.local);
                 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java?rev=739885&r1=739884&r2=739885&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java Sun Feb  1 23:35:54 2009
@@ -28,9 +28,11 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import org.apache.activeblaze.BaseService;
+import org.apache.activeblaze.Subscription;
 import org.apache.activeblaze.wire.DestinationData;
 import org.apache.activeblaze.wire.MemberData;
 import org.apache.activeblaze.wire.MessageType;
+import org.apache.activeblaze.wire.SubscriptionData;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,8 +49,8 @@
     private Timer checkMemberShipTimer;
     protected Map<String, MemberImpl> members = new ConcurrentHashMap<String, MemberImpl>();
     private List<MemberChangedListener> membershipListeners = new CopyOnWriteArrayList<MemberChangedListener>();
-    private final Map<Buffer, List<MemberImpl>> queueMap = new ConcurrentHashMap<Buffer, List<MemberImpl>>();
-    private final Map<Buffer, List<MemberImpl>> topicMap = new ConcurrentHashMap<Buffer, List<MemberImpl>>();
+    private final Map<Subscription, List<MemberImpl>> queueMap = new ConcurrentHashMap<Subscription, List<MemberImpl>>();
+    private final Map<Subscription, List<MemberImpl>> topicMap = new ConcurrentHashMap<Subscription, List<MemberImpl>>();
     private final Object memberMutex = new Object();
     protected ExecutorService listenerService;
 
@@ -286,7 +288,7 @@
                 }
                 result = member;
             } else {
-                if (data.getDestinationsChanged()) {
+                if (data.getSubscriptionsChanged()) {
                     processMemberUpdate(old, member);
                 }
             }
@@ -349,15 +351,15 @@
     }
 
     private void processDestinationsForStarted(MemberImpl member) {
-        List<DestinationData> destList = member.getData().getDestinationList();
-        for (DestinationData dest : destList) {
-            Buffer key = dest.getName();
-            Map<Buffer, List<MemberImpl>> map = null;
-            if (dest.getTopic()) {
+        List<SubscriptionData> subscriptionList = member.getData().getSubscriptionDataList();
+        for (SubscriptionData subData : subscriptionList) {
+            Map<Subscription, List<MemberImpl>> map = null;
+            if (subData.getDestinationData().getTopic()) {
                 map = this.topicMap;
             } else {
                 map = this.queueMap;
             }
+            Subscription key = new Subscription(subData);
             List<MemberImpl> members = map.get(key);
             if (members == null) {
                 members = new CopyOnWriteArrayList<MemberImpl>();
@@ -368,15 +370,15 @@
     }
 
     private void processDestinationsForStopped(MemberImpl member) {
-        List<DestinationData> destList = member.getData().getDestinationList();
-        for (DestinationData dest : destList) {
-            Buffer key = dest.getName();
-            Map<Buffer, List<MemberImpl>> map = null;
-            if (dest.getTopic()) {
+        List<SubscriptionData> subscriptionList = member.getData().getSubscriptionDataList();
+        for (SubscriptionData subData : subscriptionList) {
+            Map<Subscription, List<MemberImpl>> map = null;
+            if (subData.getDestinationData().getTopic()) {
                 map = this.topicMap;
             } else {
                 map = this.queueMap;
             }
+            Subscription key = new Subscription(subData);
             List<MemberImpl> members = map.get(key);
             if (members != null) {
                 members.remove(member);
@@ -395,14 +397,14 @@
     /**
      * @return the queueMap
      */
-    protected Map<Buffer, List<MemberImpl>> getQueueMap() {
+    protected Map<Subscription, List<MemberImpl>> getQueueMap() {
         return this.queueMap;
     }
 
     /**
      * @return the topicMap
      */
-    protected Map<Buffer, List<MemberImpl>> getTopicMap() {
+    protected Map<Subscription, List<MemberImpl>> getTopicMap() {
         return this.topicMap;
     }
 

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java?rev=739885&r1=739884&r2=739885&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java Sun Feb  1 23:35:54 2009
@@ -92,8 +92,6 @@
             SocketAddress to = packet.getTo();
             DatagramPacket dp = new DatagramPacket(data, data.length, to);
             this.socket.send(dp);
-        } else {
-            throw new BlazeException("Not initialized");
         }
     }
 

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java Sun Feb  1 23:35:54 2009
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import org.apache.activeblaze.group.BlazeGroupChannel;
+import org.apache.activeblaze.util.IdGenerator;
+
+/**
+ * Implementation of a JMS Connection
+ * 
+ */
+public class BlazeJmsConnection implements Connection, TopicConnection, QueueConnection,
+        org.apache.activeblaze.ExceptionListener {
+    protected final BlazeGroupChannel channel;
+    protected final IdGenerator tempDestinationGenerator = new IdGenerator("");;
+    private String clientId;
+    private boolean clientIdSet;
+    private ExceptionListener exceptionListener;
+    private List<Session> sessions = new CopyOnWriteArrayList<Session>();
+    private boolean closed;
+
+    protected BlazeJmsConnection(BlazeGroupChannel channel) {
+        this.channel = channel;
+        this.channel.setExceptionListener(this);
+        this.clientId = channel.getName();
+    }
+
+    /**
+     * @throws JMSException
+     * @see javax.jms.Connection#close()
+     */
+    public void close() throws JMSException {
+        this.closed = true;
+        try {
+            for (Session s : this.sessions) {
+                s.close();
+            }
+            this.sessions.clear();
+            this.channel.shutDown();
+        } catch (Exception e) {
+            throw BlazeJmsExceptionSupport.create(e);
+        }
+    }
+
+    /**
+     * @param destination
+     * @param messageSelector
+     * @param sessionPool
+     * @param maxMessages
+     * @return ConnectionConsumer
+     * @throws JMSException
+     * @see javax.jms.Connection#createConnectionConsumer(javax.jms.Destination, java.lang.String,
+     *      javax.jms.ServerSessionPool, int)
+     */
+    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
+            ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+        checkClosed();
+        return null;
+    }
+
+    /**
+     * @param topic
+     * @param subscriptionName
+     * @param messageSelector
+     * @param sessionPool
+     * @param maxMessages
+     * @return ConnectionConsumer
+     * @throws JMSException
+     * @see javax.jms.Connection#createDurableConnectionConsumer(javax.jms.Topic, java.lang.String, java.lang.String,
+     *      javax.jms.ServerSessionPool, int)
+     */
+    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
+            String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+        checkClosed();
+        return null;
+    }
+
+    /**
+     * @param transacted
+     * @param acknowledgeMode
+     * @return Session
+     * @throws JMSException
+     * @see javax.jms.Connection#createSession(boolean, int)
+     */
+    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
+        checkClosed();
+        int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode);
+        Session result = new BlazeJmsSession(this, ackMode);
+        addSession(result);
+        return result;
+    }
+
+    /**
+     * @return clientId
+     * @see javax.jms.Connection#getClientID()
+     */
+    public String getClientID() {
+        return this.clientId;
+    }
+
+    /**
+     * @return ExceptionListener
+     * @see javax.jms.Connection#getExceptionListener()
+     */
+    public ExceptionListener getExceptionListener() {
+        return this.exceptionListener;
+    }
+
+    /**
+     * @return ConnectionMetaData
+     * @see javax.jms.Connection#getMetaData()
+     */
+    public ConnectionMetaData getMetaData() {
+        return BlazeJmsConnectionMetaData.INSTANCE;
+    }
+
+    /**
+     * @param clientID
+     * @throws JMSException
+     * @see javax.jms.Connection#setClientID(java.lang.String)
+     */
+    public void setClientID(String clientID) throws JMSException {
+        if (this.channel.isStarted() && this.clientIdSet) {
+            throw new IllegalStateException("The clientID has already been set");
+        }
+        this.clientId = clientID;
+        this.clientIdSet = true;
+        this.channel.setName(clientID);
+    }
+
+    /**
+     * @param listener
+     * @see javax.jms.Connection#setExceptionListener(javax.jms.ExceptionListener)
+     */
+    public void setExceptionListener(ExceptionListener listener) {
+        this.exceptionListener = listener;
+    }
+
+    /**
+     * @throws JMSException
+     * @see javax.jms.Connection#start()
+     */
+    public void start() throws JMSException {
+        checkClosed();
+        try {
+            this.channel.start();
+        } catch (Exception e) {
+            throw BlazeJmsExceptionSupport.create(e);
+        }
+    }
+
+    /**
+     * @throws JMSException
+     * @see javax.jms.Connection#stop()
+     */
+    public void stop() throws JMSException {
+        checkClosed();
+        try {
+            this.channel.stop();
+        } catch (Exception e) {
+            throw BlazeJmsExceptionSupport.create(e);
+        }
+    }
+
+    /**
+     * @param topic
+     * @param messageSelector
+     * @param sessionPool
+     * @param maxMessages
+     * @return ConnectionConsumer
+     * @throws JMSException
+     * @see javax.jms.TopicConnection#createConnectionConsumer(javax.jms.Topic, java.lang.String,
+     *      javax.jms.ServerSessionPool, int)
+     */
+    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
+            ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+        checkClosed();
+        return null;
+    }
+
+    /**
+     * @param transacted
+     * @param acknowledgeMode
+     * @return TopicSession
+     * @throws JMSException
+     * @see javax.jms.TopicConnection#createTopicSession(boolean, int)
+     */
+    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
+        checkClosed();
+        int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode);
+        TopicSession result = new BlazeJmsTopicSession(this, ackMode);
+        addSession(result);
+        return result;
+    }
+
+    /**
+     * @param queue
+     * @param messageSelector
+     * @param sessionPool
+     * @param maxMessages
+     * @return ConnectionConsumer
+     * @throws JMSException
+     * @see javax.jms.QueueConnection#createConnectionConsumer(javax.jms.Queue, java.lang.String,
+     *      javax.jms.ServerSessionPool, int)
+     */
+    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
+            ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+        checkClosed();
+        return null;
+    }
+
+    /**
+     * @param transacted
+     * @param acknowledgeMode
+     * @return QueueSession
+     * @throws JMSException
+     * @see javax.jms.QueueConnection#createQueueSession(boolean, int)
+     */
+    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
+        checkClosed();
+        int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode);
+        QueueSession result = new BlazeJmsQueueSession(this, ackMode);
+        addSession(result);
+        return result;
+    }
+
+    /**
+     * @param ex
+     * @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
+     */
+    public void onException(Exception ex) {
+        ExceptionListener l = this.exceptionListener;
+        if (l != null) {
+            l.onException(BlazeJmsExceptionSupport.create(ex));
+        }
+    }
+
+    /**
+     * @return the channel
+     */
+    protected BlazeGroupChannel getChannel() {
+        return this.channel;
+    }
+
+    protected int getSessionAcknowledgeMode(boolean transacted, int acknowledgeMode) throws JMSException {
+        int result = acknowledgeMode;
+        if (!transacted && acknowledgeMode == Session.SESSION_TRANSACTED) {
+            throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
+        }
+        if (transacted) {
+            result = Session.SESSION_TRANSACTED;
+        }
+        return result;
+    }
+
+    protected void removeSession(Session s) {
+        this.sessions.remove(s);
+    }
+
+    protected void addSession(Session s) {
+        this.sessions.add(s);
+    }
+
+    protected void checkClosed() throws IllegalStateException {
+        if (this.closed) {
+            throw new IllegalStateException("The MessageProducer is closed");
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionClosedException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionClosedException.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionClosedException.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionClosedException.java Sun Feb  1 23:35:54 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import javax.jms.IllegalStateException;
+
+/**
+ * An exception thrown when attempt is made to use a connection when the connection has been closed.
+ *
+ * @version $Revision: 1.2 $
+ */
+public class BlazeJmsConnectionClosedException extends IllegalStateException {
+    private static final long serialVersionUID = -7681404582227153308L;
+
+    public BlazeJmsConnectionClosedException() {
+        super("The connection is already closed", "AlreadyClosed");
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionClosedException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionClosedException.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java Sun Feb  1 23:35:54 2009
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import org.apache.activeblaze.BlazeRuntimeException;
+import org.apache.activeblaze.group.BlazeGroupChannelFactory;
+import org.apache.activeblaze.group.BlazeGroupConfiguration;
+import org.apache.activeblaze.jndi.JNDIStorable;
+import org.apache.activeblaze.util.IdGenerator;
+import org.apache.activeblaze.util.PropertyUtil;
+
+/**
+ * Jms ConnectionFactory implementation
+ * 
+ */
+public class BlazeJmsConnectionFactory extends JNDIStorable implements ConnectionFactory, QueueConnectionFactory,
+        TopicConnectionFactory {
+    private static final IdGenerator NAME_GENERATOR = new IdGenerator();
+    private final BlazeGroupChannelFactory groupChannelFactory;
+    private Map<String, String> props = new HashMap<String, String>();
+
+    /**
+     * Constructor
+     */
+    public BlazeJmsConnectionFactory() {
+        this(new BlazeGroupConfiguration());
+    }
+
+    /**
+     * Constructor
+     * 
+     * @param config
+     */
+    public BlazeJmsConnectionFactory(BlazeGroupConfiguration config) {
+        this.groupChannelFactory = new BlazeGroupChannelFactory(config);
+    }
+
+    /**
+     * @return the underlying <Code>BlazeGroupConfiguration</Code>
+     */
+    public BlazeGroupConfiguration getConfiguration() {
+        return this.groupChannelFactory.getConfiguration();
+    }
+
+    /**
+     * Set properties
+     * @param props
+     */
+    public void setProperties(Properties props) {
+        Map<String,String> map = new HashMap<String, String>();
+        for (Map.Entry<Object,Object> entry: props.entrySet()) {
+            map.put(entry.getKey().toString(), entry.getValue().toString());
+        }
+        setProperties(map);
+    }
+
+    public void setProperties(Map<String, String> map) {
+        populateProperties(map);
+    }
+
+    /**
+     * @param props
+     * @see org.apache.activeblaze.jndi.JNDIStorable#buildFromProperties(Map<String, String> map)
+     */
+    protected void buildFromProperties(Map<String, String> map) {
+        PropertyUtil.setProperties(this.groupChannelFactory.getConfiguration(), map);
+    }
+
+    /**
+     * 
+     * @param map
+     * @see org.apache.activeblaze.jndi.JNDIStorable#populateProperties(Map<String, String> map)
+     */
+    protected void populateProperties(Map<String, String> map) {
+        try {
+            PropertyUtil.setProperties(this.groupChannelFactory.getConfiguration(), map);
+            this.props.putAll(map);
+        } catch (Exception e) {
+            throw new BlazeRuntimeException(e);
+        }
+    }
+
+    /**
+     * @return a TopicConnection
+     * @throws JMSException
+     * @see javax.jms.TopicConnectionFactory#createTopicConnection()
+     */
+    public TopicConnection createTopicConnection() throws JMSException {
+        try {
+            TopicConnection result = new BlazeJmsConnection(this.groupChannelFactory.createGroupChannel(NAME_GENERATOR
+                    .generateId()));
+            PropertyUtil.setProperties(result, this.props);
+            return result;
+        } catch (Exception e) {
+            throw BlazeJmsExceptionSupport.create(e);
+        }
+    }
+
+    /**
+     * @param userName
+     * @param password
+     * @return a TopicConnection
+     * @throws JMSException
+     * @see javax.jms.TopicConnectionFactory#createTopicConnection(java.lang.String, java.lang.String)
+     */
+    public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
+        try {
+            TopicConnection result = new BlazeJmsConnection(this.groupChannelFactory.createGroupChannel(NAME_GENERATOR
+                    .generateId()));
+            PropertyUtil.setProperties(result, this.props);
+            return result;
+        } catch (Exception e) {
+            throw BlazeJmsExceptionSupport.create(e);
+        }
+    }
+
+    /**
+     * @return a Connection
+     * @throws JMSException
+     * @see javax.jms.ConnectionFactory#createConnection()
+     */
+    public Connection createConnection() throws JMSException {
+        try {
+            Connection result = new BlazeJmsConnection(this.groupChannelFactory.createGroupChannel(NAME_GENERATOR
+                    .generateId()));
+            PropertyUtil.setProperties(result, this.props);
+            return result;
+        } catch (Exception e) {
+            throw BlazeJmsExceptionSupport.create(e);
+        }
+    }
+
+    /**
+     * @param userName
+     * @param password
+     * @return Connection
+     * @throws JMSException
+     * @see javax.jms.ConnectionFactory#createConnection(java.lang.String, java.lang.String)
+     */
+    public Connection createConnection(String userName, String password) throws JMSException {
+        try {
+            Connection result = new BlazeJmsConnection(this.groupChannelFactory.createGroupChannel(NAME_GENERATOR
+                    .generateId()));
+            PropertyUtil.setProperties(result, this.props);
+            return result;
+        } catch (Exception e) {
+            throw BlazeJmsExceptionSupport.create(e);
+        }
+    }
+
+    /**
+     * @return a QueueConnection
+     * @throws JMSException
+     * @see javax.jms.QueueConnectionFactory#createQueueConnection()
+     */
+    public QueueConnection createQueueConnection() throws JMSException {
+        try {
+            QueueConnection result = new BlazeJmsConnection(this.groupChannelFactory.createGroupChannel(NAME_GENERATOR
+                    .generateId()));
+            PropertyUtil.setProperties(result, this.props);
+            return result;
+        } catch (Exception e) {
+            throw BlazeJmsExceptionSupport.create(e);
+        }
+    }
+
+    /**
+     * @param userName
+     * @param password
+     * @return a QueueConnection
+     * @throws JMSException
+     * @see javax.jms.QueueConnectionFactory#createQueueConnection(java.lang.String, java.lang.String)
+     */
+    public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
+        try {
+            QueueConnection result = new BlazeJmsConnection(this.groupChannelFactory.createGroupChannel(NAME_GENERATOR
+                    .generateId()));
+            PropertyUtil.setProperties(result, this.props);
+            return result;
+        } catch (Exception e) {
+            throw BlazeJmsExceptionSupport.create(e);
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message