activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r741060 [2/2] - in /activemq/activemq-blaze/trunk/src: main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/group/ main/java/org/apache/activeblaze/impl/network/ main/java/org/apache/activeblaze/impl/reliable/swp/ main/java/or...
Date Thu, 05 Feb 2009 09:44:02 GMT
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java Thu Feb  5 09:44:01 2009
@@ -42,9 +42,13 @@
  */
 public class UdpTransport extends BaseTransport {
     private DatagramChannel channel;
+
     private ByteBuffer inBuffer;
+
     private ByteBuffer outBuffer;
-    private Map<Buffer, SendRequest> messageRequests = new LRUCache<Buffer, SendRequest>(1000);
+
+    private Map<Buffer, SendRequest> messageRequests = new LRUCache<Buffer, SendRequest>(
+            1000);
 
     public void doInit() throws Exception {
         super.doInit();
@@ -52,7 +56,8 @@
         DatagramSocket socket = this.channel.socket();
         SocketAddress address = null;
         if (getLocalURI() != null) {
-            address = new InetSocketAddress(getLocalURI().getHost(), getLocalURI().getPort());
+            address = new InetSocketAddress(getLocalURI().getHost(),
+                    getLocalURI().getPort());
         } else {
             throw new BlazeException("localURI not set");
         }
@@ -65,8 +70,9 @@
         // if the port was 0 - the port will be allocated automatically -
         // so need to reset the local uri
         URI oldURI = getLocalURI();
-        URI newURI = new URI(oldURI.getScheme(), oldURI.getUserInfo(), oldURI.getHost(), socket.getLocalPort(), oldURI
-                .getPath(), oldURI.getQuery(), oldURI.getFragment());
+        URI newURI = new URI(oldURI.getScheme(), oldURI.getUserInfo(), oldURI
+                .getHost(), socket.getLocalPort(), oldURI.getPath(), oldURI
+                .getQuery(), oldURI.getFragment());
         setLocalURI(newURI);
         this.inBuffer = ByteBuffer.allocateDirect(getMaxPacketSize());
         this.outBuffer = ByteBuffer.allocateDirect(getMaxPacketSize());
@@ -94,7 +100,8 @@
                 stream.close();
                 if (data.getResponse()) {
                     synchronized (this.messageRequests) {
-                        SendRequest request = this.messageRequests.remove(data.getCorrelationId());
+                        SendRequest request = this.messageRequests.remove(data
+                                .getCorrelationId());
                         if (request != null) {
                             request.put(data.getMessageId(), data);
                         }
@@ -121,7 +128,8 @@
             if (packet.isResponseRequired()) {
                 synchronized (this.messageRequests) {
                     request = new SendRequest();
-                    this.messageRequests.put(packet.getPacketData().getMessageId(), request);
+                    this.messageRequests.put(packet.getPacketData()
+                            .getMessageId(), request);
                 }
             }
             synchronized (buffer) {
@@ -138,11 +146,15 @@
             }
             if (request != null) {
                 if (request.get(getSoTimeout()) == null) {
-                    throw new BlazeNoRouteException("No response in " + getSoTimeout() + " ms from " + packet.getTo());
+                    throw new BlazeNoRouteException("No response in "
+                            + getSoTimeout() + " ms from " + packet.getTo());
                 }
             }
         } else {
-            throw new BlazeException(this + " Not started - cannot send " + packet);
+            if (!shutDown()) {
+                throw new BlazeException(this + " Not started - cannot send "
+                        + packet);
+            }
         }
     }
 

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java Thu Feb  5 09:44:01 2009
@@ -33,6 +33,7 @@
 import javax.jms.Topic;
 import javax.jms.TopicConnection;
 import javax.jms.TopicSession;
+import org.apache.activeblaze.Subscription;
 import org.apache.activeblaze.group.BlazeGroupChannel;
 import org.apache.activeblaze.util.IdGenerator;
 
@@ -43,17 +44,22 @@
 public class BlazeJmsConnection implements Connection, TopicConnection, QueueConnection,
         org.apache.activeblaze.ExceptionListener {
     protected final BlazeGroupChannel channel;
-    protected final IdGenerator tempDestinationGenerator = new IdGenerator("");;
+    protected final IdGenerator tempDestinationGenerator = new IdGenerator("");
     private String clientId;
     private boolean clientIdSet;
     private ExceptionListener exceptionListener;
     private List<Session> sessions = new CopyOnWriteArrayList<Session>();
+    private final BlazeMessageDispatcher queueDispatcher;
+    private final BlazeMessageDispatcher topicDispatcher;
     private boolean closed;
+    private int consumerMaxDispatchQueueDepth=10000;
 
     protected BlazeJmsConnection(BlazeGroupChannel channel) {
         this.channel = channel;
         this.channel.setExceptionListener(this);
         this.clientId = channel.getName();
+        this.queueDispatcher = new BlazeQueueMessageDispatcher(this);
+        this.topicDispatcher = new BlazeTopicMessageDispatcher(this);
     }
 
     /**
@@ -154,6 +160,9 @@
         if (this.channel.isStarted() && this.clientIdSet) {
             throw new IllegalStateException("The clientID has already been set");
         }
+        if (clientID == null) {
+            throw new IllegalStateException("Cannot have a null clientID");
+        }
         this.clientId = clientID;
         this.clientIdSet = true;
         this.channel.setName(clientID);
@@ -259,7 +268,16 @@
      * @param ex
      * @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
      */
+    
     public void onException(Exception ex) {
+       onException(BlazeJmsExceptionSupport.create(ex));
+    }
+    
+    /**
+     * @param ex
+     * @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
+     */
+    public void onException(JMSException ex) {
         ExceptionListener l = this.exceptionListener;
         if (l != null) {
             l.onException(BlazeJmsExceptionSupport.create(ex));
@@ -297,4 +315,30 @@
             throw new IllegalStateException("The MessageProducer is closed");
         }
     }
+
+    protected void addMesssageDispatcher(BlazeJmsConsumer consumer, Subscription s) throws JMSException {
+        BlazeMessageDispatcher dispatcher = s.isTopic() ? this.topicDispatcher : this.queueDispatcher;
+        dispatcher.add(consumer, s);
+    }
+
+    protected void removeMesssageDispatcher(BlazeJmsConsumer consumer, Subscription s) throws JMSException {
+        BlazeMessageDispatcher dispatcher = s.isTopic() ? this.topicDispatcher : this.queueDispatcher;
+        dispatcher.remove(consumer);
+    }
+
+    /**
+     * Get the consumerMaxDispatchQueueDepth
+     * @return the consumerMaxDispatchQueueDepth
+     */
+    public int getConsumerMaxDispatchQueueDepth() {
+        return consumerMaxDispatchQueueDepth;
+    }
+
+    /**
+     * Set the consumerMaxDispatchQueueDepth
+     * @param consumerMaxDispatchQueueDepth the consumerMaxDispatchQueueDepth to set
+     */
+    public void setConsumerMaxDispatchQueueDepth(int consumerMaxDispatchQueueDepth) {
+        this.consumerMaxDispatchQueueDepth = consumerMaxDispatchQueueDepth;
+    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java Thu Feb  5 09:44:01 2009
@@ -16,6 +16,12 @@
  */
 package org.apache.activeblaze.jms;
 
+import org.apache.activeblaze.BlazeRuntimeException;
+import org.apache.activeblaze.group.BlazeGroupChannelFactory;
+import org.apache.activeblaze.group.BlazeGroupConfiguration;
+import org.apache.activeblaze.jndi.JNDIStorable;
+import org.apache.activeblaze.util.IdGenerator;
+import org.apache.activeblaze.util.PropertyUtil;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -26,22 +32,17 @@
 import javax.jms.QueueConnectionFactory;
 import javax.jms.TopicConnection;
 import javax.jms.TopicConnectionFactory;
-import org.apache.activeblaze.BlazeRuntimeException;
-import org.apache.activeblaze.group.BlazeGroupChannelFactory;
-import org.apache.activeblaze.group.BlazeGroupConfiguration;
-import org.apache.activeblaze.jndi.JNDIStorable;
-import org.apache.activeblaze.util.IdGenerator;
-import org.apache.activeblaze.util.PropertyUtil;
 
 /**
  * Jms ConnectionFactory implementation
  * 
  */
 public class BlazeJmsConnectionFactory extends JNDIStorable implements ConnectionFactory, QueueConnectionFactory,
-        TopicConnectionFactory {
+TopicConnectionFactory {
     private static final IdGenerator NAME_GENERATOR = new IdGenerator();
     private final BlazeGroupChannelFactory groupChannelFactory;
-    private Map<String, String> props = new HashMap<String, String>();
+    private final Map<String, String> props = new HashMap<String, String>();
+    private int consumerMaxDispatchQueueDepth=10000;
 
     /**
      * Constructor
@@ -78,6 +79,7 @@
         setProperties(map);
     }
 
+    @Override
     public void setProperties(Map<String, String> map) {
         populateProperties(map);
     }
@@ -86,8 +88,10 @@
      * @param props
      * @see org.apache.activeblaze.jndi.JNDIStorable#buildFromProperties(Map<String, String> map)
      */
+    @Override
     protected void buildFromProperties(Map<String, String> map) {
         PropertyUtil.setProperties(this.groupChannelFactory.getConfiguration(), map);
+        PropertyUtil.setProperties(this, map);
     }
 
     /**
@@ -95,9 +99,13 @@
      * @param map
      * @see org.apache.activeblaze.jndi.JNDIStorable#populateProperties(Map<String, String> map)
      */
+    @Override
     protected void populateProperties(Map<String, String> map) {
         try {
-            PropertyUtil.setProperties(this.groupChannelFactory.getConfiguration(), map);
+            Map<String, String> result = PropertyUtil.getProperties(this.groupChannelFactory.getConfiguration());
+            map.putAll(result);
+            result = PropertyUtil.getProperties(this);
+            map.putAll(result);
             this.props.putAll(map);
         } catch (Exception e) {
             throw new BlazeRuntimeException(e);
@@ -205,4 +213,20 @@
             throw BlazeJmsExceptionSupport.create(e);
         }
     }
+
+    /**
+     * Get the consumerMaxDispatchQueueDepth
+     * @return the consumerMaxDispatchQueueDepth
+     */
+    public int getConsumerMaxDispatchQueueDepth() {
+        return consumerMaxDispatchQueueDepth;
+    }
+
+    /**
+     * Set the consumerMaxDispatchQueueDepth
+     * @param consumerMaxDispatchQueueDepth the consumerMaxDispatchQueueDepth to set
+     */
+    public void setConsumerMaxDispatchQueueDepth(int consumerMaxDispatchQueueDepth) {
+        this.consumerMaxDispatchQueueDepth = consumerMaxDispatchQueueDepth;
+    }
 }

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConsumer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConsumer.java?rev=741060&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConsumer.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConsumer.java Thu Feb  5 09:44:01 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import org.apache.activeblaze.jms.message.BlazeJmsMessage;
+
+/**
+ * Used internally - a listener for BlazeJmsMessages
+ *
+ */
+public interface BlazeJmsConsumer {
+    /**
+     * Consume a message
+     * @param message
+     */
+    public void onMessage(BlazeJmsMessage message);
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConsumer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java Thu Feb  5 09:44:01 2009
@@ -33,7 +33,7 @@
  */
 public class BlazeJmsDestination extends JNDIStorable implements Externalizable, javax.jms.Destination,
         Comparable<BlazeJmsDestination> {
-    protected transient Destination destination;
+    protected transient final Destination destination;
 
     /**
      * Constructor
@@ -57,6 +57,7 @@
      * @param name
      */
     public BlazeJmsDestination(String name) {
+        this();
         this.destination.setName(new Buffer(name));
     }
 

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java Thu Feb  5 09:44:01 2009
@@ -16,6 +16,14 @@
  */
 package org.apache.activeblaze.jms;
 
+import org.apache.activeblaze.Subscription;
+import org.apache.activeblaze.jms.message.BlazeJmsMessage;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -24,26 +32,37 @@
 
 /**
  * implementation of a Jms Message Consumer
- *
+ * 
  */
-public class BlazeJmsMessageConsumer implements MessageConsumer {
- protected final BlazeJmsSession session;
- protected final BlazeJmsDestination destination;
- private boolean closed;
- private MessageListener messageListener;
- private String messageSelector = "";
- 
- protected BlazeJmsMessageConsumer(BlazeJmsSession s,BlazeJmsDestination destination) {
-     this.session=s;
-     this.destination=destination;
- }
+public class BlazeJmsMessageConsumer implements MessageConsumer, BlazeJmsConsumer {
+    protected final BlazeJmsSession session;
+    protected final BlazeJmsDestination destination;
+    protected final Subscription subscription = new Subscription();
+    private boolean closed;
+    private MessageListener messageListener;
+    private String messageSelector = "";
+    private final Lock lock = new ReentrantLock();
+    private LinkedBlockingQueue<BlazeJmsMessage> dispatchQueue;
+
+    protected BlazeJmsMessageConsumer(BlazeJmsSession s, BlazeJmsDestination destination,int queueDepth) {
+        this.session = s;
+        this.destination = destination;
+        this.subscription.setDestination(this.destination.getDestination().getData());
+        this.dispatchQueue= new LinkedBlockingQueue<BlazeJmsMessage>(queueDepth);
+    }
+
     /**
+     * @throws JMSException 
      * @see javax.jms.MessageConsumer#close()
      */
-    public void close() {
-        this.closed=true;
+    public void close() throws JMSException {
+        this.closed = true;
         this.session.remove(this);
-        
+       
+    }
+
+    protected Subscription getSubscription() {
+        return this.subscription;
     }
 
     /**
@@ -52,8 +71,8 @@
      * @see javax.jms.MessageConsumer#getMessageListener()
      */
     public MessageListener getMessageListener() throws JMSException {
-       checkClosed();
-       return this.messageListener;
+        checkClosed();
+        return this.messageListener;
     }
 
     /**
@@ -62,8 +81,8 @@
      * @see javax.jms.MessageConsumer#getMessageSelector()
      */
     public String getMessageSelector() throws JMSException {
-       checkClosed();
-       return this.messageSelector;
+        checkClosed();
+        return this.messageSelector;
     }
 
     /**
@@ -73,7 +92,7 @@
      */
     public Message receive() throws JMSException {
         checkClosed();
-        return null;
+        return this.dispatchQueue.poll();
     }
 
     /**
@@ -84,7 +103,11 @@
      */
     public Message receive(long timeout) throws JMSException {
         checkClosed();
-        return null;
+        try {
+            return this.dispatchQueue.poll(timeout,TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            throw BlazeJmsExceptionSupport.create(e);
+        }
     }
 
     /**
@@ -94,7 +117,11 @@
      */
     public Message receiveNoWait() throws JMSException {
         checkClosed();
-        return null;
+        Message result =  this.dispatchQueue.peek();
+        if (result != null){
+            this.dispatchQueue.remove(result);
+        }
+        return result;
     }
 
     /**
@@ -104,23 +131,52 @@
      */
     public void setMessageListener(MessageListener listener) throws JMSException {
         checkClosed();
-        this.messageListener=listener;
-        
+        this.lock.lock();
+        try{
+        this.messageListener = listener;
+        if (!this.dispatchQueue.isEmpty() && this.messageListener != null){
+            List<BlazeJmsMessage> drain = new ArrayList<BlazeJmsMessage>(this.dispatchQueue.size());
+            this.dispatchQueue.drainTo(drain);
+            for (BlazeJmsMessage m:drain){
+                this.messageListener.onMessage(m);
+            }
+            drain.clear();
+        }
+        }finally{
+            this.lock.unlock();
+        }
     }
-    
+
     /**
-     * @param messageSelector the messageSelector to set
-     * @throws IllegalStateException 
+     * @param messageSelector
+     *            the messageSelector to set
+     * @throws IllegalStateException
      */
     public void setMessageSelector(String messageSelector) throws IllegalStateException {
         checkClosed();
         this.messageSelector = messageSelector;
     }
-    
+
     protected void checkClosed() throws IllegalStateException {
         if (this.closed) {
             throw new IllegalStateException("The MessageProducer is closed");
         }
     }
-   
+
+    /**
+     * @param message
+     * @see org.apache.activeblaze.jms.BlazeJmsConsumer#onMessage(org.apache.activeblaze.jms.message.BlazeJmsMessage)
+     */
+    public void onMessage(BlazeJmsMessage message) {
+        this.lock.lock();
+        try{
+        if (this.messageListener != null) {
+            this.messageListener.onMessage(message);
+        }else{
+            this.dispatchQueue.add(message);
+        }
+        }finally{
+            lock.unlock();
+        }
+    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java Thu Feb  5 09:44:01 2009
@@ -30,8 +30,8 @@
      * Constructor
      * @param s
      */
-    protected BlazeJmsQueueReceiver(BlazeJmsSession s,BlazeJmsDestination d) {
-        super(s,d);
+    protected BlazeJmsQueueReceiver(BlazeJmsSession s,BlazeJmsDestination d,int queueDepth) {
+        super(s,d,queueDepth);
     }
 
     /**

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java Thu Feb  5 09:44:01 2009
@@ -46,6 +46,7 @@
 import org.apache.activeblaze.jms.message.BlazeJmsBytesMessage;
 import org.apache.activeblaze.jms.message.BlazeJmsMapMessage;
 import org.apache.activeblaze.jms.message.BlazeJmsMessage;
+import org.apache.activeblaze.jms.message.BlazeJmsMessageTransformation;
 import org.apache.activeblaze.jms.message.BlazeJmsObjectMessage;
 import org.apache.activeblaze.jms.message.BlazeJmsStreamMessage;
 import org.apache.activeblaze.jms.message.BlazeJmsTextMessage;
@@ -135,7 +136,11 @@
      */
     public MessageConsumer createConsumer(Destination destination) throws JMSException {
         checkClosed();
-        return new BlazeJmsMessageConsumer(this, BlazeJmsDestination.transform(destination));
+        BlazeJmsDestination dest = BlazeJmsDestination.transform(destination);
+        int queueDepth = this.connection.getConsumerMaxDispatchQueueDepth();
+        BlazeJmsMessageConsumer result =  new BlazeJmsMessageConsumer(this,dest,queueDepth );
+        add(result);
+        return result;
     }
 
     /**
@@ -147,8 +152,11 @@
      */
     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
         checkClosed();
-        BlazeJmsMessageConsumer result = new BlazeJmsMessageConsumer(this, BlazeJmsDestination.transform(destination));
+        BlazeJmsDestination dest = BlazeJmsDestination.transform(destination);
+        int queueDepth = this.connection.getConsumerMaxDispatchQueueDepth();
+        BlazeJmsMessageConsumer result = new BlazeJmsMessageConsumer(this, dest,queueDepth);
         result.setMessageSelector(messageSelector);
+        add(result);
         return result;
     }
 
@@ -164,8 +172,11 @@
             throws JMSException {
         checkClosed();
         BlazeJmsDestination dest = BlazeJmsDestination.transform(destination);
-        BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", false, NoLocal);
+       
+        int queueDepth = this.connection.getConsumerMaxDispatchQueueDepth();
+        BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", false, NoLocal,queueDepth);
         result.setMessageSelector(messageSelector);
+        add(result);
         return result;
     }
 
@@ -179,7 +190,9 @@
     public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
         checkClosed();
         BlazeJmsDestination dest = BlazeJmsDestination.transform(topic);
-        BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", true, false);
+        int queueDepth = this.connection.getConsumerMaxDispatchQueueDepth();
+        BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", true, false,queueDepth);
+        add(result);
         return result;
     }
 
@@ -196,8 +209,10 @@
             throws JMSException {
         checkClosed();
         BlazeJmsDestination dest = BlazeJmsDestination.transform(topic);
-        BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", true, noLocal);
+        int queueDepth = this.connection.getConsumerMaxDispatchQueueDepth();
+        BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", true, noLocal,queueDepth);
         result.setMessageSelector(messageSelector);
+        add(result);
         return result;
     }
 
@@ -254,6 +269,7 @@
         checkClosed();
         BlazeJmsDestination dest = BlazeJmsDestination.transform(destination);
         BlazeJmsMessageProducer result = new BlazeJmsMessageProducer(this, dest);
+        add(result);
         return result;
     }
 
@@ -407,14 +423,16 @@
 
     /**
      * @param queue
-     * @return
+     * @return QueueRecevier
      * @throws JMSException
      * @see javax.jms.QueueSession#createReceiver(javax.jms.Queue)
      */
     public QueueReceiver createReceiver(Queue queue) throws JMSException {
         checkClosed();
         BlazeJmsDestination dest = BlazeJmsDestination.transform(queue);
-        BlazeJmsQueueReceiver result = new BlazeJmsQueueReceiver(this, dest);
+        int queueDepth = this.connection.getConsumerMaxDispatchQueueDepth();
+        BlazeJmsQueueReceiver result = new BlazeJmsQueueReceiver(this, dest,queueDepth);
+        add(result);
         return result;
     }
 
@@ -428,8 +446,10 @@
     public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
         checkClosed();
         BlazeJmsDestination dest = BlazeJmsDestination.transform(queue);
-        BlazeJmsQueueReceiver result = new BlazeJmsQueueReceiver(this, dest);
+        int queueDepth = this.connection.getConsumerMaxDispatchQueueDepth();
+        BlazeJmsQueueReceiver result = new BlazeJmsQueueReceiver(this, dest,queueDepth);
         result.setMessageSelector(messageSelector);
+        add(result);
         return result;
     }
 
@@ -456,6 +476,7 @@
         checkClosed();
         BlazeJmsDestination dest = BlazeJmsDestination.transform(topic);
         BlazeJmsTopicPublisher result = new BlazeJmsTopicPublisher(this, dest);
+        add(result);
         return result;
     }
 
@@ -468,7 +489,9 @@
     public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
         checkClosed();
         BlazeJmsDestination dest = BlazeJmsDestination.transform(topic);
-        BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", false, false);
+        int queueDepth = this.connection.getConsumerMaxDispatchQueueDepth();
+        BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", false, false,queueDepth);
+        add(result);
         return result;
     }
 
@@ -483,15 +506,26 @@
     public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
         checkClosed();
         BlazeJmsDestination dest = BlazeJmsDestination.transform(topic);
-        BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", false, noLocal);
+        int queueDepth = this.connection.getConsumerMaxDispatchQueueDepth();
+        BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", false, noLocal,queueDepth);
         result.setMessageSelector(messageSelector);
         return result;
     }
+    
+    protected void add(BlazeJmsMessageConsumer consumer) throws JMSException {
+        this.consumers.add(consumer);
+        this.connection.addMesssageDispatcher(consumer, consumer.getSubscription());
+    }
 
-    protected void remove(MessageConsumer consumer) {
+    protected void remove(BlazeJmsMessageConsumer consumer) throws JMSException {
         this.consumers.remove(consumer);
+        this.connection.removeMesssageDispatcher(consumer, consumer.getSubscription());
     }
 
+    protected void add(MessageProducer producer) {
+        this.producers.add(producer);
+    }
+    
     protected void remove(MessageProducer producer) {
         this.producers.remove(producer);
     }
@@ -515,9 +549,9 @@
         }
         try {
             if (destination.isTopic()) {
-                this.connection.channel.send(destination.getName(), message);
+                this.connection.channel.broadcast(destination.getDestination(), message);
             } else {
-                this.connection.channel.broadcast(destination.getName(), message);
+                this.connection.channel.send(destination.getDestination(), message);
             }
         } catch (Exception e) {
             throw BlazeJmsExceptionSupport.create(e);

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicSubscriber.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicSubscriber.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicSubscriber.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicSubscriber.java Thu Feb  5 09:44:01 2009
@@ -25,22 +25,18 @@
  * 
  */
 public class BlazeJmsTopicSubscriber extends BlazeJmsMessageConsumer implements TopicSubscriber {
-    private final boolean durable;
-    private final boolean noLocal;
-    private String name;
-
     /**
      * Constructor
      * 
      * @param s
      * @param destination
      */
-    protected BlazeJmsTopicSubscriber(BlazeJmsSession s, BlazeJmsDestination destination, 
-            String name, boolean durable,boolean noLocal) {
-        super(s, destination);
-        this.name = name;
-        this.durable = durable;
-        this.noLocal = noLocal;
+    protected BlazeJmsTopicSubscriber(BlazeJmsSession s, BlazeJmsDestination destination, String name, boolean durable,
+            boolean noLocal,int queueDepth) {
+        super(s, destination,queueDepth);
+        getSubscription().setSubscriberName(name);
+        getSubscription().setDurable(durable);
+        getSubscription().setNoLocal(noLocal);
     }
 
     /**
@@ -50,7 +46,7 @@
      */
     public boolean getNoLocal() throws IllegalStateException {
         checkClosed();
-        return this.noLocal;
+        return getSubscription().isNoLocal();
     }
 
     /**
@@ -69,7 +65,7 @@
      */
     public boolean isDurable() throws IllegalStateException {
         checkClosed();
-        return this.durable;
+        return getSubscription().isDurable();
     }
 
     /**
@@ -78,16 +74,6 @@
      */
     public String getName() throws IllegalStateException {
         checkClosed();
-        return this.name;
-    }
-
-    /**
-     * @param name
-     *            the name to set
-     * @throws IllegalStateException
-     */
-    public void setName(String name) throws IllegalStateException {
-        checkClosed();
-        this.name = name;
+        return getSubscription().getSubscriberName();
     }
 }

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeMessageDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeMessageDispatcher.java?rev=741060&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeMessageDispatcher.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeMessageDispatcher.java Thu Feb  5 09:44:01 2009
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import org.apache.activeblaze.BlazeMessage;
+import org.apache.activeblaze.BlazeMessageListener;
+import org.apache.activeblaze.Subscription;
+import org.apache.activeblaze.jms.message.BlazeJmsMessage;
+import org.apache.activeblaze.jms.message.BlazeJmsMessageTransformation;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.jms.JMSException;
+
+/**
+ * Dispatcher of Blaze messages
+ */
+abstract class BlazeMessageDispatcher implements BlazeMessageListener {
+    protected Map<Subscription, List<BlazeJmsConsumer>> subscriptionMap = new LinkedHashMap<Subscription, List<BlazeJmsConsumer>>();
+    protected Map<BlazeJmsConsumer, Subscription> listenerMap = new HashMap<BlazeJmsConsumer, Subscription>();
+    protected final BlazeJmsConnection connection;
+
+    BlazeMessageDispatcher(BlazeJmsConnection connection) {
+        this.connection = connection;
+    }
+
+    void add(BlazeJmsConsumer c, Subscription s) throws JMSException {
+        synchronized (this.subscriptionMap) {
+            List<BlazeJmsConsumer> list = this.subscriptionMap.get(s);
+            if (list == null) {
+                list = new CopyOnWriteArrayList<BlazeJmsConsumer>();
+                this.subscriptionMap.put(s, list);
+                addSubscriptionToChannel(s);
+            }
+            list.add(c);
+        }
+        synchronized (this.listenerMap) {
+            this.listenerMap.put(c, s);
+        }
+    }
+
+    void remove(BlazeJmsConsumer c) throws JMSException {
+        Subscription s = null;
+        synchronized (this.listenerMap) {
+            s = this.listenerMap.remove(c);
+        }
+        if (s != null) {
+            List<BlazeJmsConsumer> list = null;
+            synchronized (this.subscriptionMap) {
+                list = this.subscriptionMap.get(s);
+            }
+            if (list != null) {
+                list.remove(c);
+                if (list.isEmpty()) {
+                    this.subscriptionMap.remove(s);
+                    removeSubscriptionToChannel(s);
+                }
+            }
+        }
+    }
+    
+    public void onMessage(BlazeMessage message) {
+        
+        try {
+            BlazeJmsMessage result = BlazeJmsMessageTransformation.transformMessage(message);
+            processMessage(result);
+        } catch (JMSException e) {
+            this.connection.onException(e);
+        }
+        
+    }
+
+    protected abstract void addSubscriptionToChannel(Subscription s) throws JMSException;
+
+    protected abstract void removeSubscriptionToChannel(Subscription s) throws JMSException;
+    
+    protected abstract void processMessage(BlazeJmsMessage message);
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeMessageDispatcher.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeMessageDispatcher.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeQueueMessageDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeQueueMessageDispatcher.java?rev=741060&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeQueueMessageDispatcher.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeQueueMessageDispatcher.java Thu Feb  5 09:44:01 2009
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import org.apache.activeblaze.Destination;
+import org.apache.activeblaze.Subscription;
+import org.apache.activeblaze.jms.message.BlazeJmsMessage;
+import java.util.List;
+import java.util.Map;
+import javax.jms.JMSException;
+/**
+ * Dispatcher of Blaze messages
+ * 
+ */
+public class BlazeQueueMessageDispatcher extends BlazeMessageDispatcher {
+    /**
+     * Constructor
+     * 
+     * @param channel
+     */
+    BlazeQueueMessageDispatcher(BlazeJmsConnection connection) {
+        super(connection);
+    }
+
+    /**
+     * @param s
+     * @throws Exception
+     * @see org.apache.activeblaze.jms.BlazeMessageDispatcher#addSubscriptionToChannel(org.apache.activeblaze.Subscription)
+     */
+    @Override
+    protected void addSubscriptionToChannel(Subscription s) throws JMSException {
+        try {
+            this.connection.channel.addBlazeQueueMessageListener(s, this);
+        } catch (Exception e) {
+            throw BlazeJmsExceptionSupport.create(e);
+        }
+    }
+
+    /**
+     * @param s
+     * @see org.apache.activeblaze.jms.BlazeMessageDispatcher#removeSubscriptionToChannel(org.apache.activeblaze.Subscription)
+     */
+    @Override
+    protected void removeSubscriptionToChannel(Subscription s) throws JMSException {
+        try {
+            this.connection.channel.removeBlazeQueueMessageListener(s);
+        } catch (Exception e) {
+            throw BlazeJmsExceptionSupport.create(e);
+        }
+    }
+
+    /**
+     * @param jmsMsg
+     * @see org.apache.activeblaze.BlazeMessageListener#onMessage(org.apache.activeblaze.BlazeMessage)
+     */
+    public void processMessage(BlazeJmsMessage jmsMsg) {
+        BlazeJmsConsumer target = null;
+        Destination destination = jmsMsg.getDestination();
+        synchronized (this.subscriptionMap) {
+            for (Map.Entry<Subscription, List<BlazeJmsConsumer>> entry : this.subscriptionMap.entrySet()) {
+                if (entry.getKey().matches(destination)) {
+                    List<BlazeJmsConsumer> list = entry.getValue();
+                    target = list.get(0);
+                    if (list.size() > 0) {
+                        list.remove(0);
+                        list.add(target);
+                    }
+                    this.subscriptionMap.remove(entry.getKey());
+                    this.subscriptionMap.put(entry.getKey(), entry.getValue());
+                    break;
+                }
+            }
+        }
+        if (target != null) {
+            target.onMessage(jmsMsg);
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeQueueMessageDispatcher.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeQueueMessageDispatcher.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeTopicMessageDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeTopicMessageDispatcher.java?rev=741060&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeTopicMessageDispatcher.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeTopicMessageDispatcher.java Thu Feb  5 09:44:01 2009
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import org.apache.activeblaze.Destination;
+import org.apache.activeblaze.Subscription;
+import org.apache.activeblaze.jms.message.BlazeJmsMessage;
+import java.util.List;
+import java.util.Map;
+import javax.jms.JMSException;
+/**
+ * Dispatcher of Blaze messages
+ * 
+ */
+public class BlazeTopicMessageDispatcher extends BlazeMessageDispatcher {
+    /**
+     * Constructor
+     * 
+     * @param channel
+     */
+    BlazeTopicMessageDispatcher(BlazeJmsConnection connection) {
+        super(connection);
+    }
+
+    /**
+     * @param s
+     * @throws Exception
+     * @see org.apache.activeblaze.jms.BlazeMessageDispatcher#addSubscriptionToChannel(org.apache.activeblaze.Subscription)
+     */
+    @Override
+    protected void addSubscriptionToChannel(Subscription s) throws JMSException {
+        try {
+            this.connection.channel.addBlazeTopicMessageListener(s, this);
+        } catch (Exception e) {
+            throw BlazeJmsExceptionSupport.create(e);
+        }
+    }
+
+    /**
+     * @param s
+     * @see org.apache.activeblaze.jms.BlazeMessageDispatcher#removeSubscriptionToChannel(org.apache.activeblaze.Subscription)
+     */
+    @Override
+    protected void removeSubscriptionToChannel(Subscription s) throws JMSException {
+        try {
+            this.connection.channel.removeBlazeTopicMessageListener(s);
+        } catch (Exception e) {
+            throw BlazeJmsExceptionSupport.create(e);
+        }
+    }
+
+    /**
+     * @param jmsMsg
+     * @see org.apache.activeblaze.BlazeMessageListener#onMessage(org.apache.activeblaze.BlazeMessage)
+     */
+    public void processMessage(BlazeJmsMessage jmsMsg) {
+        Destination destination = jmsMsg.getDestination();
+        synchronized (this.subscriptionMap) {
+            for (Map.Entry<Subscription, List<BlazeJmsConsumer>> entry : this.subscriptionMap.entrySet()) {
+                if (entry.getKey().matches(destination)) {
+                    List<BlazeJmsConsumer> list = entry.getValue();
+                    for (BlazeJmsConsumer c : list) {
+                        c.onMessage(jmsMsg.clone());
+                    }
+                }
+            }
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeTopicMessageDispatcher.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeTopicMessageDispatcher.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java Thu Feb  5 09:44:01 2009
@@ -29,6 +29,7 @@
 import org.apache.activeblaze.BlazeException;
 import org.apache.activeblaze.BlazeRuntimeException;
 import org.apache.activeblaze.jms.BlazeJmsExceptionSupport;
+import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType;
 import org.apache.activeblaze.wire.BlazeData;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.protobuf.BufferInputStream;
@@ -100,6 +101,14 @@
         copy.bytesOut = null;
         copy.dataIn = null;
     }
+    
+    /** 
+     * @return the type
+     * @see org.apache.activeblaze.BlazeMessage#getType()
+     */
+    public int getType(){
+        return JmsMessageType.BYTES.ordinal();
+    }
 
     public void storeContent() {
         super.storeContent();

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java Thu Feb  5 09:44:01 2009
@@ -27,6 +27,7 @@
 import javax.jms.MessageNotWriteableException;
 import org.apache.activeblaze.BlazeException;
 import org.apache.activeblaze.BlazeRuntimeException;
+import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType;
 import org.apache.activeblaze.wire.BlazeData;
 import org.apache.activeblaze.wire.MapData;
 import org.apache.activemq.protobuf.Buffer;
@@ -98,6 +99,14 @@
         storeContent();
         super.copy(copy);
     }
+    
+    /** 
+     * @return the type
+     * @see org.apache.activeblaze.BlazeMessage#getType()
+     */
+    public int getType(){
+        return JmsMessageType.MAP.ordinal();
+    }
 
     public void storeContent() {
         super.storeContent();

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java Thu Feb  5 09:44:01 2009
@@ -35,10 +35,16 @@
  * 
  */
 public class BlazeJmsMessage extends BlazeMessage implements Message {
+   
+    protected enum JmsMessageType{MESSAGE,BYTES,MAP,OBJECT,STREAM,TEXT}
     protected transient Callback<BlazeJmsMessage> acknowledgeCallback;
     protected transient BlazeJmsDestination jmsDestination;
     protected transient BlazeJmsDestination jmsReplyToDestination;
 
+    /** 
+     * @return clone of a BlazeMessage
+     * @see org.apache.activeblaze.BlazeMessage#clone()
+     */
     public BlazeJmsMessage clone() {
         BlazeJmsMessage copy = new BlazeJmsMessage();
         try {
@@ -55,6 +61,14 @@
         copy.jmsDestination = this.jmsDestination;
         copy.jmsReplyToDestination = this.jmsReplyToDestination;
     }
+    
+    /** 
+     * @return the type
+     * @see org.apache.activeblaze.BlazeMessage#getType()
+     */
+    public int getType(){
+        return JmsMessageType.MESSAGE.ordinal();
+    }
 
     /**
      * @return the acknowledge <Code>Callback</Code>
@@ -261,12 +275,12 @@
     }
 
     /**
-     * @return
+     * @return the message type
      * @throws JMSException
      * @see javax.jms.Message#getJMSType()
      */
     public String getJMSType() throws JMSException {
-        return getType();
+        return getMessageType();
     }
 
     /**
@@ -470,7 +484,7 @@
      * @see javax.jms.Message#setJMSType(java.lang.String)
      */
     public void setJMSType(String type) {
-        setType(type);
+        setMessageType(type);
     }
 
     /**

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java (from r739885, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java&r1=739885&r2=741060&rev=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java Thu Feb  5 09:44:01 2009
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activeblaze.jms;
+package org.apache.activeblaze.jms.message;
 
 import java.util.Enumeration;
 import javax.jms.BytesMessage;
@@ -26,15 +26,11 @@
 import javax.jms.ObjectMessage;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
-import org.apache.activeblaze.jms.message.BlazeJmsBytesMessage;
-import org.apache.activeblaze.jms.message.BlazeJmsMapMessage;
-import org.apache.activeblaze.jms.message.BlazeJmsMessage;
-import org.apache.activeblaze.jms.message.BlazeJmsObjectMessage;
-import org.apache.activeblaze.jms.message.BlazeJmsStreamMessage;
-import org.apache.activeblaze.jms.message.BlazeJmsTextMessage;
-
+import org.apache.activeblaze.BlazeMessage;
+import org.apache.activeblaze.jms.BlazeJmsDestination;
 /**
- * A helper class for converting normal JMS interfaces into ActiveMQ specific ones.
+ * A helper class for converting normal JMS interfaces into ActiveMQ specific
+ * ones.
  * 
  * @version $Revision: 1.1 $
  */
@@ -45,12 +41,37 @@
     /**
      * @param dest
      * @return a BlazeJmsDestination
-     * @throws JMSException 
+     * @throws JMSException
      */
     private static BlazeJmsDestination transformDestination(Destination dest) throws JMSException {
         return BlazeJmsDestination.transform(dest);
     }
-    
+
+    /**
+     * @param message
+     * @return a BlazeJmsMessage
+     * @throws JMSException
+     */
+    public static BlazeJmsMessage transformMessage(BlazeMessage message) throws JMSException {
+        BlazeJmsMessage result = null;
+        int type = message.getType();
+        if (type == BlazeJmsMessage.JmsMessageType.BYTES.ordinal()) {
+            result = new BlazeJmsBytesMessage();
+        } else if (type == BlazeJmsMessage.JmsMessageType.MAP.ordinal()) {
+            result = new BlazeJmsMapMessage();
+        } else if (type == BlazeJmsMessage.JmsMessageType.OBJECT.ordinal()) {
+            result = new BlazeJmsObjectMessage();
+        } else if (type == BlazeJmsMessage.JmsMessageType.STREAM.ordinal()) {
+            result = new BlazeJmsStreamMessage();
+        } else if (type == BlazeJmsMessage.JmsMessageType.TEXT.ordinal()) {
+            result = new BlazeJmsTextMessage();
+        } else {
+            result = new BlazeJmsMessage();
+        }
+        result.setContent(message.getContent());
+        return result;
+    }
+
     /**
      * @param message
      * @return a BlazeJmsDestination
@@ -118,7 +139,8 @@
     }
 
     /**
-     * Copies the standard JMS and user defined properties from the givem message to the specified message
+     * Copies the standard JMS and user defined properties from the givem
+     * message to the specified message
      * 
      * @param fromMessage
      *            the message to take the properties from
@@ -144,6 +166,4 @@
             toMessage.setObjectProperty(name, obj);
         }
     }
-
-   
 }

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java Thu Feb  5 09:44:01 2009
@@ -27,6 +27,7 @@
 import org.apache.activeblaze.BlazeException;
 import org.apache.activeblaze.BlazeRuntimeException;
 import org.apache.activeblaze.jms.BlazeJmsExceptionSupport;
+import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType;
 import org.apache.activeblaze.util.ClassLoadingAwareObjectInputStream;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.protobuf.BufferInputStream;
@@ -75,6 +76,14 @@
         super.copy(copy);
         copy.object = null;
     }
+    
+    /** 
+     * @return the type
+     * @see org.apache.activeblaze.BlazeMessage#getType()
+     */
+    public int getType(){
+        return JmsMessageType.OBJECT.ordinal();
+    }
 
     public void storeContent() {
         super.storeContent();

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java Thu Feb  5 09:44:01 2009
@@ -30,6 +30,7 @@
 import org.apache.activeblaze.BlazeException;
 import org.apache.activeblaze.BlazeRuntimeException;
 import org.apache.activeblaze.jms.BlazeJmsExceptionSupport;
+import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType;
 import org.apache.activeblaze.wire.BlazeData;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.protobuf.BufferInputStream;
@@ -119,6 +120,14 @@
         copy.bytesOut = null;
         copy.dataIn = null;
     }
+    
+    /** 
+     * @return the type
+     * @see org.apache.activeblaze.BlazeMessage#getType()
+     */
+    public int getType(){
+        return JmsMessageType.STREAM.ordinal();
+    }
 
     public void storeContent() {
         super.storeContent();

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java Thu Feb  5 09:44:01 2009
@@ -23,6 +23,7 @@
 import javax.jms.TextMessage;
 import org.apache.activeblaze.BlazeException;
 import org.apache.activeblaze.BlazeRuntimeException;
+import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.protobuf.BufferInputStream;
 import org.apache.activemq.protobuf.BufferOutputStream;
@@ -53,6 +54,14 @@
         super.copy(copy);
         copy.text = this.text;
     }
+    
+    /** 
+     * @return the type
+     * @see org.apache.activeblaze.BlazeMessage#getType()
+     */
+    public int getType(){
+        return JmsMessageType.TEXT.ordinal();
+    }
 
     public void setText(String text) {
         this.text = text;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java Thu Feb  5 09:44:01 2009
@@ -22,8 +22,6 @@
 import java.io.ObjectOutput;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Properties;
-
 import javax.naming.NamingException;
 import javax.naming.Reference;
 import javax.naming.Referenceable;

Modified: activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/proto/blaze.proto?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/proto/blaze.proto (original)
+++ activemq/activemq-blaze/trunk/src/main/proto/blaze.proto Thu Feb  5 09:44:01 2009
@@ -55,7 +55,7 @@
     message AckData {
      //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
        //| option java_type_method = "MessageType";
-       required int64 id =1;
+       optional int64 id =1;
        optional int64 startSequence =2;
        optional int64 endSequence =3;
        optional int64 sessionId = 4;
@@ -66,7 +66,7 @@
     message NackData {
      //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
        //| option java_type_method = "MessageType";
-       required int64 id =1;
+       optional int64 id =1;
        optional int64 startSequence =2;
        optional int64 endSequence =3;
        optional int64 sessionId = 4;
@@ -75,7 +75,7 @@
     message ControlData {
      //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
        //| option java_type_method = "MessageType";
-       required int64 lastId =1; //last ack or nack id
+       optional int64 lastId =1; //last ack or nack id
     }
     
     message DestinationData {
@@ -85,12 +85,13 @@
     }
     
     message SubscriptionData {
-      optional bool durable =1;
-      optional int32 weight = 2;
-      optional string channelName =3;
-      optional string subscriberName =4;
-      optional string selector =5;
-      optional DestinationData destinationData =6;
+      optional bool durable = 1;
+      optional bool noLocal = 2;
+      optional int32 weight = 3;
+      optional string channelName = 4;
+      optional string subscriberName = 5;
+      optional string selector = 6;
+      optional DestinationData destinationData = 7;
     }
     
     message MemberData {
@@ -241,15 +242,16 @@
      optional bool persistent = 1;
      optional int32 priority = 2;
      optional int32 redeliveryCounter = 3;
-     optional int64 timestamp = 4;
-     optional int64 expiration = 5;
-     optional bytes messageId = 6;
-     optional bytes correlationId = 7;
-     optional bytes fromId =8;
-     optional bytes type = 9;
-     optional bytes payload = 10;
-     optional DestinationData destinationData = 11;  
-     optional DestinationData replyToData = 12;  
+     optional int32 type =4;
+     optional int64 timestamp = 5;
+     optional int64 expiration = 6;
+     optional bytes messageId = 7;
+     optional bytes correlationId = 8;
+     optional bytes fromId =9;
+     optional bytes messageType = 10;
+     optional bytes payload = 11;
+     optional DestinationData destinationData = 12;  
+     optional DestinationData replyToData = 13;  
      optional MapData mapData = 14;
      optional bytes payload = 15;
       

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfConsumer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfConsumer.java?rev=741060&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfConsumer.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfConsumer.java Thu Feb  5 09:44:01 2009
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms.perf;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.Topic;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public class PerfConsumer implements MessageListener {
+    private static final Log LOG = LogFactory.getLog(PerfConsumer.class);
+    protected Connection connection;
+    protected MessageConsumer consumer;
+    protected long sleepDuration;
+    protected long initialDelay;
+    protected boolean firstMessage = true;
+    protected PerfRate rate = new PerfRate();
+
+    public PerfConsumer(ConnectionFactory fac, Destination dest, String consumerName) throws JMSException {
+        connection = fac.createConnection();
+        connection.setClientID(consumerName!=null ? consumerName:"Consumer");
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        if (dest instanceof Topic && consumerName != null && consumerName.length() > 0) {
+            consumer = s.createDurableSubscriber((Topic) dest, consumerName);
+        } else {
+            consumer = s.createConsumer(dest);
+        }
+        consumer.setMessageListener(this);
+    }
+
+    public PerfConsumer(ConnectionFactory fac, Destination dest) throws JMSException {
+        this(fac, dest, null);
+    }
+
+    public void start() throws JMSException {
+        connection.start();
+        rate.reset();
+    }
+
+    public void stop() throws JMSException {
+        connection.stop();
+    }
+
+    public void shutDown() throws JMSException {
+        connection.close();
+    }
+
+    public PerfRate getRate() {
+        return rate;
+    }
+
+    public void onMessage(Message msg) {
+        if (firstMessage) {
+            firstMessage = false;
+            if (getInitialDelay() > 0) {
+                try {
+                    Thread.sleep(getInitialDelay());
+                } catch (InterruptedException e) {
+                }
+            }
+        }
+        rate.increment();
+        try {
+            if (sleepDuration != 0) {
+                Thread.sleep(sleepDuration);
+            }
+        } catch (InterruptedException e) {
+        }
+    }
+
+    public synchronized long getSleepDuration() {
+        return sleepDuration;
+    }
+
+    public synchronized void setSleepDuration(long sleepDuration) {
+        this.sleepDuration = sleepDuration;
+    }
+
+    /**
+     * @return the initialDelay
+     */
+    public long getInitialDelay() {
+        return initialDelay;
+    }
+
+    /**
+     * @param initialDelay
+     *            the initialDelay to set
+     */
+    public void setInitialDelay(long initialDelay) {
+        this.initialDelay = initialDelay;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfConsumer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfProducer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfProducer.java?rev=741060&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfProducer.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfProducer.java Thu Feb  5 09:44:01 2009
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms.perf;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public class PerfProducer implements Runnable {
+    protected Connection connection;
+    protected MessageProducer producer;
+    protected PerfRate rate = new PerfRate();
+    private byte[] payload;
+    private Session session;
+    private final CountDownLatch stopped = new CountDownLatch(1);
+    private boolean running;
+    private int sleep = 0;
+
+    public PerfProducer(ConnectionFactory fac, Destination dest, byte[] palyload) throws JMSException {
+        connection = fac.createConnection();
+        connection.setClientID("Producer");
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(dest);
+        this.payload = palyload;
+    }
+
+    public void setDeliveryMode(int mode) throws JMSException {
+        producer.setDeliveryMode(mode);
+    }
+    
+    public void setTimeToLive(int ttl) throws JMSException {
+        producer.setTimeToLive(ttl);
+    }
+
+    public void shutDown() throws JMSException {
+        connection.close();
+    }
+
+    public PerfRate getRate() {
+        return rate;
+    }
+
+    public synchronized void start() throws JMSException {
+        if (!running) {
+            rate.reset();
+            running = true;
+            connection.start();
+            Thread t = new  Thread(this);
+            t.setName("Producer");
+            t.start();
+        }
+    }
+
+    public void stop() throws JMSException, InterruptedException {
+        synchronized (this) {
+            running = false;
+        }
+        stopped.await(1,TimeUnit.SECONDS);
+        connection.stop();
+    }
+
+    public synchronized boolean isRunning() {
+        return running;
+    }
+
+    public void run() {
+        try {
+            while (isRunning()) {
+                BytesMessage msg;
+                msg = session.createBytesMessage();
+                msg.writeBytes(payload);
+                producer.send(msg);
+                rate.increment();
+                if (sleep > 0) {
+                    Thread.sleep(sleep);
+                }
+            }
+        } catch (Throwable e) {
+            e.printStackTrace();
+        } finally {
+            stopped.countDown();
+        }
+    }
+
+    public int getSleep() {
+        return sleep;
+    }
+
+    public void setSleep(int sleep) {
+        this.sleep = sleep;
+    }
+
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfProducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfProducer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfRate.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfRate.java?rev=741060&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfRate.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfRate.java Thu Feb  5 09:44:01 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms.perf;
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public class PerfRate {
+
+    protected int totalCount;
+    protected int count;
+    protected long startTime = System.currentTimeMillis();
+
+    /**
+     * @return Returns the count.
+     */
+    public int getCount() {
+        return totalCount;
+    }
+
+    public synchronized void increment() {
+        totalCount++;
+        count++;
+    }
+
+    public int getRate() {
+        long endTime = System.currentTimeMillis();
+        long totalTime = endTime - startTime;
+        int result = (int)((count * 1000) / totalTime);
+        return result;
+    }
+
+    /**
+     * Resets the rate sampling.
+     */
+    public synchronized PerfRate cloneAndReset() {
+        PerfRate rc = new PerfRate();
+        rc.totalCount = totalCount;
+        rc.count = count;
+        rc.startTime = startTime;
+        count = 0;
+        startTime = System.currentTimeMillis();
+        return rc;
+    }
+
+    /**
+     * Resets the rate sampling.
+     */
+    public void reset() {
+        count = 0;
+        startTime = System.currentTimeMillis();
+    }
+
+    /**
+     * @return Returns the totalCount.
+     */
+    public int getTotalCount() {
+        return totalCount;
+    }
+
+    /**
+     * @param totalCount The totalCount to set.
+     */
+    public void setTotalCount(int totalCount) {
+        this.totalCount = totalCount;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfRate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfRate.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/SimpleTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/SimpleTopicTest.java?rev=741060&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/SimpleTopicTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/SimpleTopicTest.java Thu Feb  5 09:44:01 2009
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms.perf;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activeblaze.jms.BlazeJmsConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public class SimpleTopicTest extends TestCase {
+
+    private static final Log LOG = LogFactory.getLog(SimpleTopicTest.class);
+    
+   
+    protected PerfProducer[] producers;
+    protected PerfConsumer[] consumers;
+    protected String destinationName = getClass().getName();
+    protected int sampleCount = 20;
+    protected long sampleInternal = 10000;
+    protected int numberOfDestinations=1;
+    protected int numberOfConsumers = 1;
+    protected int numberofProducers = 1;
+    protected int totalNumberOfProducers;
+    protected int totalNumberOfConsumers;
+    protected int playloadSize = 12;
+    protected byte[] array;
+    protected ConnectionFactory factory;
+    
+     /**
+     * Sets up a test where the producer and consumer have their own connection.
+     * 
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        
+        factory = createConnectionFactory();
+        Connection con = factory.createConnection();
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+       
+        
+        LOG.info("Running " + numberofProducers + " producer(s) and " + numberOfConsumers + " consumer(s) per " + numberOfDestinations + " Destination(s)");
+       
+        totalNumberOfConsumers=numberOfConsumers*numberOfDestinations;
+        totalNumberOfProducers=numberofProducers*numberOfDestinations;
+        producers = new PerfProducer[totalNumberOfProducers];
+        consumers = new PerfConsumer[totalNumberOfConsumers];
+        int consumerCount = 0;
+        int producerCount = 0;
+        for (int k =0; k < numberOfDestinations;k++) {
+            Destination destination = createDestination(session, destinationName+":"+k);
+            LOG.info("Testing against destination: " + destination);
+            for (int i = 0; i < numberOfConsumers; i++) {
+                consumers[consumerCount] = createConsumer(factory, destination, consumerCount);
+                consumerCount++;
+            }
+            for (int i = 0; i < numberofProducers; i++) {
+                array = new byte[playloadSize];
+                for (int j = i; j < array.length; j++) {
+                    array[j] = (byte)j;
+                }
+                producers[producerCount] = createProducer(factory, destination, i, array);
+                producerCount++;
+            }
+        }
+        con.close();
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        for (int i = 0; i < numberOfConsumers; i++) {
+
+        }
+        for (int i = 0; i < numberofProducers; i++) {
+            producers[i].shutDown();
+        }
+       
+    }
+
+    protected Destination createDestination(Session s, String destinationName) throws JMSException {
+        return s.createTopic(destinationName);
+    }
+
+    /**
+     * Factory method to create a new broker
+     * 
+     * @throws Exception
+     */
+    
+
+    protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte[] payload) throws JMSException {
+        return new PerfProducer(fac, dest, payload);
+    }
+
+    protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
+        return new PerfConsumer(fac, dest);
+    }
+    
+    
+   
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        int portNum = 61616;
+        String uri = "static://(";
+        int count = this.numberofProducers + this.numberOfConsumers;
+        for (int i = 0; i < count;i++) {
+            uri += "udp://localhost:" + (portNum++);
+            uri += ",";
+        }
+        uri += ")";
+        BlazeJmsConnectionFactory result = new BlazeJmsConnectionFactory();
+        result.getConfiguration().setReliableBroadcast("swp");
+        result.getConfiguration().setManagementURI("");
+        result.getConfiguration().setBroadcastURI(uri);
+        return result;
+    }
+
+    public void testPerformance() throws JMSException, InterruptedException {
+        for (int i = 0; i < totalNumberOfConsumers; i++) {
+            consumers[i].start();
+        }
+        for (int i = 0; i < totalNumberOfProducers; i++) {
+            producers[i].start();
+        }
+        LOG.info("Sampling performance " + sampleCount + " times at a " + sampleInternal + " ms interval.");
+        for (int i = 0; i < sampleCount; i++) {
+            Thread.sleep(sampleInternal);
+            dumpProducerRate();
+            dumpConsumerRate();
+        }
+        for (int i = 0; i < totalNumberOfProducers; i++) {
+            producers[i].stop();
+        }
+        for (int i = 0; i < totalNumberOfConsumers; i++) {
+            consumers[i].stop();
+        }
+    }
+
+    protected void dumpProducerRate() {
+        int totalRate = 0;
+        int totalCount = 0;
+        String producerString="Producers:";
+        for (int i = 0; i < producers.length; i++) {
+            PerfRate rate = producers[i].getRate().cloneAndReset();
+            totalRate += rate.getRate();
+            totalCount += rate.getTotalCount();
+            producerString+="["+i+":"+rate.getRate() + ","+rate.getTotalCount()+"];";
+        }
+        if (producers != null && producers.length > 0) {
+            int avgRate = totalRate / producers.length;
+            System.out.println("Avg producer rate = " + avgRate
+                    + " msg/sec | Total rate = " + totalRate + ", sent = "
+                    + totalCount);
+           // System.out.println(producerString);
+        }
+    }
+
+    protected void dumpConsumerRate() {
+        int totalRate = 0;
+        int totalCount = 0;
+        String consumerString="Consumers:";
+        for (int i = 0; i < consumers.length; i++) {
+            PerfRate rate = consumers[i].getRate().cloneAndReset();
+            totalRate += rate.getRate();
+            totalCount += rate.getTotalCount();
+            consumerString+="["+i+":"+rate.getRate() + ","+rate.getTotalCount()+"];";
+        }
+        if (consumers != null && consumers.length > 0) {
+            int avgRate = totalRate / consumers.length;
+            System.out.println("Avg consumer rate = " + avgRate + " msg/sec | Total rate = " + totalRate + ", received = " + totalCount);
+            System.out.println(consumerString);
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/SimpleTopicTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/SimpleTopicTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message