Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java?rev=741060&r1=741059&r2=741060&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java Thu Feb 5 09:44:01 2009 @@ -42,9 +42,13 @@ */ public class UdpTransport extends BaseTransport { private DatagramChannel channel; + private ByteBuffer inBuffer; + private ByteBuffer outBuffer; - private Map messageRequests = new LRUCache(1000); + + private Map messageRequests = new LRUCache( + 1000); public void doInit() throws Exception { super.doInit(); @@ -52,7 +56,8 @@ DatagramSocket socket = this.channel.socket(); SocketAddress address = null; if (getLocalURI() != null) { - address = new InetSocketAddress(getLocalURI().getHost(), getLocalURI().getPort()); + address = new InetSocketAddress(getLocalURI().getHost(), + getLocalURI().getPort()); } else { throw new BlazeException("localURI not set"); } @@ -65,8 +70,9 @@ // if the port was 0 - the port will be allocated automatically - // so need to reset the local uri URI oldURI = getLocalURI(); - URI newURI = new URI(oldURI.getScheme(), oldURI.getUserInfo(), oldURI.getHost(), socket.getLocalPort(), oldURI - .getPath(), oldURI.getQuery(), oldURI.getFragment()); + URI newURI = new URI(oldURI.getScheme(), oldURI.getUserInfo(), oldURI + .getHost(), socket.getLocalPort(), oldURI.getPath(), oldURI + .getQuery(), oldURI.getFragment()); setLocalURI(newURI); this.inBuffer = ByteBuffer.allocateDirect(getMaxPacketSize()); this.outBuffer = ByteBuffer.allocateDirect(getMaxPacketSize()); @@ -94,7 +100,8 @@ stream.close(); if (data.getResponse()) { synchronized (this.messageRequests) { - SendRequest request = this.messageRequests.remove(data.getCorrelationId()); + SendRequest request = this.messageRequests.remove(data + .getCorrelationId()); if (request != null) { request.put(data.getMessageId(), data); } @@ -121,7 +128,8 @@ if (packet.isResponseRequired()) { synchronized (this.messageRequests) { request = new SendRequest(); - this.messageRequests.put(packet.getPacketData().getMessageId(), request); + this.messageRequests.put(packet.getPacketData() + .getMessageId(), request); } } synchronized (buffer) { @@ -138,11 +146,15 @@ } if (request != null) { if (request.get(getSoTimeout()) == null) { - throw new BlazeNoRouteException("No response in " + getSoTimeout() + " ms from " + packet.getTo()); + throw new BlazeNoRouteException("No response in " + + getSoTimeout() + " ms from " + packet.getTo()); } } } else { - throw new BlazeException(this + " Not started - cannot send " + packet); + if (!shutDown()) { + throw new BlazeException(this + " Not started - cannot send " + + packet); + } } } Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java?rev=741060&r1=741059&r2=741060&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java Thu Feb 5 09:44:01 2009 @@ -33,6 +33,7 @@ import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicSession; +import org.apache.activeblaze.Subscription; import org.apache.activeblaze.group.BlazeGroupChannel; import org.apache.activeblaze.util.IdGenerator; @@ -43,17 +44,22 @@ public class BlazeJmsConnection implements Connection, TopicConnection, QueueConnection, org.apache.activeblaze.ExceptionListener { protected final BlazeGroupChannel channel; - protected final IdGenerator tempDestinationGenerator = new IdGenerator("");; + protected final IdGenerator tempDestinationGenerator = new IdGenerator(""); private String clientId; private boolean clientIdSet; private ExceptionListener exceptionListener; private List sessions = new CopyOnWriteArrayList(); + private final BlazeMessageDispatcher queueDispatcher; + private final BlazeMessageDispatcher topicDispatcher; private boolean closed; + private int consumerMaxDispatchQueueDepth=10000; protected BlazeJmsConnection(BlazeGroupChannel channel) { this.channel = channel; this.channel.setExceptionListener(this); this.clientId = channel.getName(); + this.queueDispatcher = new BlazeQueueMessageDispatcher(this); + this.topicDispatcher = new BlazeTopicMessageDispatcher(this); } /** @@ -154,6 +160,9 @@ if (this.channel.isStarted() && this.clientIdSet) { throw new IllegalStateException("The clientID has already been set"); } + if (clientID == null) { + throw new IllegalStateException("Cannot have a null clientID"); + } this.clientId = clientID; this.clientIdSet = true; this.channel.setName(clientID); @@ -259,7 +268,16 @@ * @param ex * @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception) */ + public void onException(Exception ex) { + onException(BlazeJmsExceptionSupport.create(ex)); + } + + /** + * @param ex + * @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception) + */ + public void onException(JMSException ex) { ExceptionListener l = this.exceptionListener; if (l != null) { l.onException(BlazeJmsExceptionSupport.create(ex)); @@ -297,4 +315,30 @@ throw new IllegalStateException("The MessageProducer is closed"); } } + + protected void addMesssageDispatcher(BlazeJmsConsumer consumer, Subscription s) throws JMSException { + BlazeMessageDispatcher dispatcher = s.isTopic() ? this.topicDispatcher : this.queueDispatcher; + dispatcher.add(consumer, s); + } + + protected void removeMesssageDispatcher(BlazeJmsConsumer consumer, Subscription s) throws JMSException { + BlazeMessageDispatcher dispatcher = s.isTopic() ? this.topicDispatcher : this.queueDispatcher; + dispatcher.remove(consumer); + } + + /** + * Get the consumerMaxDispatchQueueDepth + * @return the consumerMaxDispatchQueueDepth + */ + public int getConsumerMaxDispatchQueueDepth() { + return consumerMaxDispatchQueueDepth; + } + + /** + * Set the consumerMaxDispatchQueueDepth + * @param consumerMaxDispatchQueueDepth the consumerMaxDispatchQueueDepth to set + */ + public void setConsumerMaxDispatchQueueDepth(int consumerMaxDispatchQueueDepth) { + this.consumerMaxDispatchQueueDepth = consumerMaxDispatchQueueDepth; + } } Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java?rev=741060&r1=741059&r2=741060&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java Thu Feb 5 09:44:01 2009 @@ -16,6 +16,12 @@ */ package org.apache.activeblaze.jms; +import org.apache.activeblaze.BlazeRuntimeException; +import org.apache.activeblaze.group.BlazeGroupChannelFactory; +import org.apache.activeblaze.group.BlazeGroupConfiguration; +import org.apache.activeblaze.jndi.JNDIStorable; +import org.apache.activeblaze.util.IdGenerator; +import org.apache.activeblaze.util.PropertyUtil; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -26,22 +32,17 @@ import javax.jms.QueueConnectionFactory; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; -import org.apache.activeblaze.BlazeRuntimeException; -import org.apache.activeblaze.group.BlazeGroupChannelFactory; -import org.apache.activeblaze.group.BlazeGroupConfiguration; -import org.apache.activeblaze.jndi.JNDIStorable; -import org.apache.activeblaze.util.IdGenerator; -import org.apache.activeblaze.util.PropertyUtil; /** * Jms ConnectionFactory implementation * */ public class BlazeJmsConnectionFactory extends JNDIStorable implements ConnectionFactory, QueueConnectionFactory, - TopicConnectionFactory { +TopicConnectionFactory { private static final IdGenerator NAME_GENERATOR = new IdGenerator(); private final BlazeGroupChannelFactory groupChannelFactory; - private Map props = new HashMap(); + private final Map props = new HashMap(); + private int consumerMaxDispatchQueueDepth=10000; /** * Constructor @@ -78,6 +79,7 @@ setProperties(map); } + @Override public void setProperties(Map map) { populateProperties(map); } @@ -86,8 +88,10 @@ * @param props * @see org.apache.activeblaze.jndi.JNDIStorable#buildFromProperties(Map map) */ + @Override protected void buildFromProperties(Map map) { PropertyUtil.setProperties(this.groupChannelFactory.getConfiguration(), map); + PropertyUtil.setProperties(this, map); } /** @@ -95,9 +99,13 @@ * @param map * @see org.apache.activeblaze.jndi.JNDIStorable#populateProperties(Map map) */ + @Override protected void populateProperties(Map map) { try { - PropertyUtil.setProperties(this.groupChannelFactory.getConfiguration(), map); + Map result = PropertyUtil.getProperties(this.groupChannelFactory.getConfiguration()); + map.putAll(result); + result = PropertyUtil.getProperties(this); + map.putAll(result); this.props.putAll(map); } catch (Exception e) { throw new BlazeRuntimeException(e); @@ -205,4 +213,20 @@ throw BlazeJmsExceptionSupport.create(e); } } + + /** + * Get the consumerMaxDispatchQueueDepth + * @return the consumerMaxDispatchQueueDepth + */ + public int getConsumerMaxDispatchQueueDepth() { + return consumerMaxDispatchQueueDepth; + } + + /** + * Set the consumerMaxDispatchQueueDepth + * @param consumerMaxDispatchQueueDepth the consumerMaxDispatchQueueDepth to set + */ + public void setConsumerMaxDispatchQueueDepth(int consumerMaxDispatchQueueDepth) { + this.consumerMaxDispatchQueueDepth = consumerMaxDispatchQueueDepth; + } } Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConsumer.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConsumer.java?rev=741060&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConsumer.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConsumer.java Thu Feb 5 09:44:01 2009 @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import org.apache.activeblaze.jms.message.BlazeJmsMessage; + +/** + * Used internally - a listener for BlazeJmsMessages + * + */ +public interface BlazeJmsConsumer { + /** + * Consume a message + * @param message + */ + public void onMessage(BlazeJmsMessage message); +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConsumer.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConsumer.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java?rev=741060&r1=741059&r2=741060&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java Thu Feb 5 09:44:01 2009 @@ -33,7 +33,7 @@ */ public class BlazeJmsDestination extends JNDIStorable implements Externalizable, javax.jms.Destination, Comparable { - protected transient Destination destination; + protected transient final Destination destination; /** * Constructor @@ -57,6 +57,7 @@ * @param name */ public BlazeJmsDestination(String name) { + this(); this.destination.setName(new Buffer(name)); } Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java?rev=741060&r1=741059&r2=741060&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java Thu Feb 5 09:44:01 2009 @@ -16,6 +16,14 @@ */ package org.apache.activeblaze.jms; +import org.apache.activeblaze.Subscription; +import org.apache.activeblaze.jms.message.BlazeJmsMessage; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Message; @@ -24,26 +32,37 @@ /** * implementation of a Jms Message Consumer - * + * */ -public class BlazeJmsMessageConsumer implements MessageConsumer { - protected final BlazeJmsSession session; - protected final BlazeJmsDestination destination; - private boolean closed; - private MessageListener messageListener; - private String messageSelector = ""; - - protected BlazeJmsMessageConsumer(BlazeJmsSession s,BlazeJmsDestination destination) { - this.session=s; - this.destination=destination; - } +public class BlazeJmsMessageConsumer implements MessageConsumer, BlazeJmsConsumer { + protected final BlazeJmsSession session; + protected final BlazeJmsDestination destination; + protected final Subscription subscription = new Subscription(); + private boolean closed; + private MessageListener messageListener; + private String messageSelector = ""; + private final Lock lock = new ReentrantLock(); + private LinkedBlockingQueue dispatchQueue; + + protected BlazeJmsMessageConsumer(BlazeJmsSession s, BlazeJmsDestination destination,int queueDepth) { + this.session = s; + this.destination = destination; + this.subscription.setDestination(this.destination.getDestination().getData()); + this.dispatchQueue= new LinkedBlockingQueue(queueDepth); + } + /** + * @throws JMSException * @see javax.jms.MessageConsumer#close() */ - public void close() { - this.closed=true; + public void close() throws JMSException { + this.closed = true; this.session.remove(this); - + + } + + protected Subscription getSubscription() { + return this.subscription; } /** @@ -52,8 +71,8 @@ * @see javax.jms.MessageConsumer#getMessageListener() */ public MessageListener getMessageListener() throws JMSException { - checkClosed(); - return this.messageListener; + checkClosed(); + return this.messageListener; } /** @@ -62,8 +81,8 @@ * @see javax.jms.MessageConsumer#getMessageSelector() */ public String getMessageSelector() throws JMSException { - checkClosed(); - return this.messageSelector; + checkClosed(); + return this.messageSelector; } /** @@ -73,7 +92,7 @@ */ public Message receive() throws JMSException { checkClosed(); - return null; + return this.dispatchQueue.poll(); } /** @@ -84,7 +103,11 @@ */ public Message receive(long timeout) throws JMSException { checkClosed(); - return null; + try { + return this.dispatchQueue.poll(timeout,TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw BlazeJmsExceptionSupport.create(e); + } } /** @@ -94,7 +117,11 @@ */ public Message receiveNoWait() throws JMSException { checkClosed(); - return null; + Message result = this.dispatchQueue.peek(); + if (result != null){ + this.dispatchQueue.remove(result); + } + return result; } /** @@ -104,23 +131,52 @@ */ public void setMessageListener(MessageListener listener) throws JMSException { checkClosed(); - this.messageListener=listener; - + this.lock.lock(); + try{ + this.messageListener = listener; + if (!this.dispatchQueue.isEmpty() && this.messageListener != null){ + List drain = new ArrayList(this.dispatchQueue.size()); + this.dispatchQueue.drainTo(drain); + for (BlazeJmsMessage m:drain){ + this.messageListener.onMessage(m); + } + drain.clear(); + } + }finally{ + this.lock.unlock(); + } } - + /** - * @param messageSelector the messageSelector to set - * @throws IllegalStateException + * @param messageSelector + * the messageSelector to set + * @throws IllegalStateException */ public void setMessageSelector(String messageSelector) throws IllegalStateException { checkClosed(); this.messageSelector = messageSelector; } - + protected void checkClosed() throws IllegalStateException { if (this.closed) { throw new IllegalStateException("The MessageProducer is closed"); } } - + + /** + * @param message + * @see org.apache.activeblaze.jms.BlazeJmsConsumer#onMessage(org.apache.activeblaze.jms.message.BlazeJmsMessage) + */ + public void onMessage(BlazeJmsMessage message) { + this.lock.lock(); + try{ + if (this.messageListener != null) { + this.messageListener.onMessage(message); + }else{ + this.dispatchQueue.add(message); + } + }finally{ + lock.unlock(); + } + } } Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java?rev=741060&r1=741059&r2=741060&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java Thu Feb 5 09:44:01 2009 @@ -30,8 +30,8 @@ * Constructor * @param s */ - protected BlazeJmsQueueReceiver(BlazeJmsSession s,BlazeJmsDestination d) { - super(s,d); + protected BlazeJmsQueueReceiver(BlazeJmsSession s,BlazeJmsDestination d,int queueDepth) { + super(s,d,queueDepth); } /** Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java?rev=741060&r1=741059&r2=741060&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java Thu Feb 5 09:44:01 2009 @@ -46,6 +46,7 @@ import org.apache.activeblaze.jms.message.BlazeJmsBytesMessage; import org.apache.activeblaze.jms.message.BlazeJmsMapMessage; import org.apache.activeblaze.jms.message.BlazeJmsMessage; +import org.apache.activeblaze.jms.message.BlazeJmsMessageTransformation; import org.apache.activeblaze.jms.message.BlazeJmsObjectMessage; import org.apache.activeblaze.jms.message.BlazeJmsStreamMessage; import org.apache.activeblaze.jms.message.BlazeJmsTextMessage; @@ -135,7 +136,11 @@ */ public MessageConsumer createConsumer(Destination destination) throws JMSException { checkClosed(); - return new BlazeJmsMessageConsumer(this, BlazeJmsDestination.transform(destination)); + BlazeJmsDestination dest = BlazeJmsDestination.transform(destination); + int queueDepth = this.connection.getConsumerMaxDispatchQueueDepth(); + BlazeJmsMessageConsumer result = new BlazeJmsMessageConsumer(this,dest,queueDepth ); + add(result); + return result; } /** @@ -147,8 +152,11 @@ */ public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { checkClosed(); - BlazeJmsMessageConsumer result = new BlazeJmsMessageConsumer(this, BlazeJmsDestination.transform(destination)); + BlazeJmsDestination dest = BlazeJmsDestination.transform(destination); + int queueDepth = this.connection.getConsumerMaxDispatchQueueDepth(); + BlazeJmsMessageConsumer result = new BlazeJmsMessageConsumer(this, dest,queueDepth); result.setMessageSelector(messageSelector); + add(result); return result; } @@ -164,8 +172,11 @@ throws JMSException { checkClosed(); BlazeJmsDestination dest = BlazeJmsDestination.transform(destination); - BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", false, NoLocal); + + int queueDepth = this.connection.getConsumerMaxDispatchQueueDepth(); + BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", false, NoLocal,queueDepth); result.setMessageSelector(messageSelector); + add(result); return result; } @@ -179,7 +190,9 @@ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { checkClosed(); BlazeJmsDestination dest = BlazeJmsDestination.transform(topic); - BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", true, false); + int queueDepth = this.connection.getConsumerMaxDispatchQueueDepth(); + BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", true, false,queueDepth); + add(result); return result; } @@ -196,8 +209,10 @@ throws JMSException { checkClosed(); BlazeJmsDestination dest = BlazeJmsDestination.transform(topic); - BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", true, noLocal); + int queueDepth = this.connection.getConsumerMaxDispatchQueueDepth(); + BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", true, noLocal,queueDepth); result.setMessageSelector(messageSelector); + add(result); return result; } @@ -254,6 +269,7 @@ checkClosed(); BlazeJmsDestination dest = BlazeJmsDestination.transform(destination); BlazeJmsMessageProducer result = new BlazeJmsMessageProducer(this, dest); + add(result); return result; } @@ -407,14 +423,16 @@ /** * @param queue - * @return + * @return QueueRecevier * @throws JMSException * @see javax.jms.QueueSession#createReceiver(javax.jms.Queue) */ public QueueReceiver createReceiver(Queue queue) throws JMSException { checkClosed(); BlazeJmsDestination dest = BlazeJmsDestination.transform(queue); - BlazeJmsQueueReceiver result = new BlazeJmsQueueReceiver(this, dest); + int queueDepth = this.connection.getConsumerMaxDispatchQueueDepth(); + BlazeJmsQueueReceiver result = new BlazeJmsQueueReceiver(this, dest,queueDepth); + add(result); return result; } @@ -428,8 +446,10 @@ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { checkClosed(); BlazeJmsDestination dest = BlazeJmsDestination.transform(queue); - BlazeJmsQueueReceiver result = new BlazeJmsQueueReceiver(this, dest); + int queueDepth = this.connection.getConsumerMaxDispatchQueueDepth(); + BlazeJmsQueueReceiver result = new BlazeJmsQueueReceiver(this, dest,queueDepth); result.setMessageSelector(messageSelector); + add(result); return result; } @@ -456,6 +476,7 @@ checkClosed(); BlazeJmsDestination dest = BlazeJmsDestination.transform(topic); BlazeJmsTopicPublisher result = new BlazeJmsTopicPublisher(this, dest); + add(result); return result; } @@ -468,7 +489,9 @@ public TopicSubscriber createSubscriber(Topic topic) throws JMSException { checkClosed(); BlazeJmsDestination dest = BlazeJmsDestination.transform(topic); - BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", false, false); + int queueDepth = this.connection.getConsumerMaxDispatchQueueDepth(); + BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", false, false,queueDepth); + add(result); return result; } @@ -483,15 +506,26 @@ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { checkClosed(); BlazeJmsDestination dest = BlazeJmsDestination.transform(topic); - BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", false, noLocal); + int queueDepth = this.connection.getConsumerMaxDispatchQueueDepth(); + BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", false, noLocal,queueDepth); result.setMessageSelector(messageSelector); return result; } + + protected void add(BlazeJmsMessageConsumer consumer) throws JMSException { + this.consumers.add(consumer); + this.connection.addMesssageDispatcher(consumer, consumer.getSubscription()); + } - protected void remove(MessageConsumer consumer) { + protected void remove(BlazeJmsMessageConsumer consumer) throws JMSException { this.consumers.remove(consumer); + this.connection.removeMesssageDispatcher(consumer, consumer.getSubscription()); } + protected void add(MessageProducer producer) { + this.producers.add(producer); + } + protected void remove(MessageProducer producer) { this.producers.remove(producer); } @@ -515,9 +549,9 @@ } try { if (destination.isTopic()) { - this.connection.channel.send(destination.getName(), message); + this.connection.channel.broadcast(destination.getDestination(), message); } else { - this.connection.channel.broadcast(destination.getName(), message); + this.connection.channel.send(destination.getDestination(), message); } } catch (Exception e) { throw BlazeJmsExceptionSupport.create(e); Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicSubscriber.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicSubscriber.java?rev=741060&r1=741059&r2=741060&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicSubscriber.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicSubscriber.java Thu Feb 5 09:44:01 2009 @@ -25,22 +25,18 @@ * */ public class BlazeJmsTopicSubscriber extends BlazeJmsMessageConsumer implements TopicSubscriber { - private final boolean durable; - private final boolean noLocal; - private String name; - /** * Constructor * * @param s * @param destination */ - protected BlazeJmsTopicSubscriber(BlazeJmsSession s, BlazeJmsDestination destination, - String name, boolean durable,boolean noLocal) { - super(s, destination); - this.name = name; - this.durable = durable; - this.noLocal = noLocal; + protected BlazeJmsTopicSubscriber(BlazeJmsSession s, BlazeJmsDestination destination, String name, boolean durable, + boolean noLocal,int queueDepth) { + super(s, destination,queueDepth); + getSubscription().setSubscriberName(name); + getSubscription().setDurable(durable); + getSubscription().setNoLocal(noLocal); } /** @@ -50,7 +46,7 @@ */ public boolean getNoLocal() throws IllegalStateException { checkClosed(); - return this.noLocal; + return getSubscription().isNoLocal(); } /** @@ -69,7 +65,7 @@ */ public boolean isDurable() throws IllegalStateException { checkClosed(); - return this.durable; + return getSubscription().isDurable(); } /** @@ -78,16 +74,6 @@ */ public String getName() throws IllegalStateException { checkClosed(); - return this.name; - } - - /** - * @param name - * the name to set - * @throws IllegalStateException - */ - public void setName(String name) throws IllegalStateException { - checkClosed(); - this.name = name; + return getSubscription().getSubscriberName(); } } Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeMessageDispatcher.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeMessageDispatcher.java?rev=741060&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeMessageDispatcher.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeMessageDispatcher.java Thu Feb 5 09:44:01 2009 @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import org.apache.activeblaze.BlazeMessage; +import org.apache.activeblaze.BlazeMessageListener; +import org.apache.activeblaze.Subscription; +import org.apache.activeblaze.jms.message.BlazeJmsMessage; +import org.apache.activeblaze.jms.message.BlazeJmsMessageTransformation; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import javax.jms.JMSException; + +/** + * Dispatcher of Blaze messages + */ +abstract class BlazeMessageDispatcher implements BlazeMessageListener { + protected Map> subscriptionMap = new LinkedHashMap>(); + protected Map listenerMap = new HashMap(); + protected final BlazeJmsConnection connection; + + BlazeMessageDispatcher(BlazeJmsConnection connection) { + this.connection = connection; + } + + void add(BlazeJmsConsumer c, Subscription s) throws JMSException { + synchronized (this.subscriptionMap) { + List list = this.subscriptionMap.get(s); + if (list == null) { + list = new CopyOnWriteArrayList(); + this.subscriptionMap.put(s, list); + addSubscriptionToChannel(s); + } + list.add(c); + } + synchronized (this.listenerMap) { + this.listenerMap.put(c, s); + } + } + + void remove(BlazeJmsConsumer c) throws JMSException { + Subscription s = null; + synchronized (this.listenerMap) { + s = this.listenerMap.remove(c); + } + if (s != null) { + List list = null; + synchronized (this.subscriptionMap) { + list = this.subscriptionMap.get(s); + } + if (list != null) { + list.remove(c); + if (list.isEmpty()) { + this.subscriptionMap.remove(s); + removeSubscriptionToChannel(s); + } + } + } + } + + public void onMessage(BlazeMessage message) { + + try { + BlazeJmsMessage result = BlazeJmsMessageTransformation.transformMessage(message); + processMessage(result); + } catch (JMSException e) { + this.connection.onException(e); + } + + } + + protected abstract void addSubscriptionToChannel(Subscription s) throws JMSException; + + protected abstract void removeSubscriptionToChannel(Subscription s) throws JMSException; + + protected abstract void processMessage(BlazeJmsMessage message); +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeMessageDispatcher.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeMessageDispatcher.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeQueueMessageDispatcher.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeQueueMessageDispatcher.java?rev=741060&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeQueueMessageDispatcher.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeQueueMessageDispatcher.java Thu Feb 5 09:44:01 2009 @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import org.apache.activeblaze.Destination; +import org.apache.activeblaze.Subscription; +import org.apache.activeblaze.jms.message.BlazeJmsMessage; +import java.util.List; +import java.util.Map; +import javax.jms.JMSException; +/** + * Dispatcher of Blaze messages + * + */ +public class BlazeQueueMessageDispatcher extends BlazeMessageDispatcher { + /** + * Constructor + * + * @param channel + */ + BlazeQueueMessageDispatcher(BlazeJmsConnection connection) { + super(connection); + } + + /** + * @param s + * @throws Exception + * @see org.apache.activeblaze.jms.BlazeMessageDispatcher#addSubscriptionToChannel(org.apache.activeblaze.Subscription) + */ + @Override + protected void addSubscriptionToChannel(Subscription s) throws JMSException { + try { + this.connection.channel.addBlazeQueueMessageListener(s, this); + } catch (Exception e) { + throw BlazeJmsExceptionSupport.create(e); + } + } + + /** + * @param s + * @see org.apache.activeblaze.jms.BlazeMessageDispatcher#removeSubscriptionToChannel(org.apache.activeblaze.Subscription) + */ + @Override + protected void removeSubscriptionToChannel(Subscription s) throws JMSException { + try { + this.connection.channel.removeBlazeQueueMessageListener(s); + } catch (Exception e) { + throw BlazeJmsExceptionSupport.create(e); + } + } + + /** + * @param jmsMsg + * @see org.apache.activeblaze.BlazeMessageListener#onMessage(org.apache.activeblaze.BlazeMessage) + */ + public void processMessage(BlazeJmsMessage jmsMsg) { + BlazeJmsConsumer target = null; + Destination destination = jmsMsg.getDestination(); + synchronized (this.subscriptionMap) { + for (Map.Entry> entry : this.subscriptionMap.entrySet()) { + if (entry.getKey().matches(destination)) { + List list = entry.getValue(); + target = list.get(0); + if (list.size() > 0) { + list.remove(0); + list.add(target); + } + this.subscriptionMap.remove(entry.getKey()); + this.subscriptionMap.put(entry.getKey(), entry.getValue()); + break; + } + } + } + if (target != null) { + target.onMessage(jmsMsg); + } + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeQueueMessageDispatcher.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeQueueMessageDispatcher.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeTopicMessageDispatcher.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeTopicMessageDispatcher.java?rev=741060&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeTopicMessageDispatcher.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeTopicMessageDispatcher.java Thu Feb 5 09:44:01 2009 @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import org.apache.activeblaze.Destination; +import org.apache.activeblaze.Subscription; +import org.apache.activeblaze.jms.message.BlazeJmsMessage; +import java.util.List; +import java.util.Map; +import javax.jms.JMSException; +/** + * Dispatcher of Blaze messages + * + */ +public class BlazeTopicMessageDispatcher extends BlazeMessageDispatcher { + /** + * Constructor + * + * @param channel + */ + BlazeTopicMessageDispatcher(BlazeJmsConnection connection) { + super(connection); + } + + /** + * @param s + * @throws Exception + * @see org.apache.activeblaze.jms.BlazeMessageDispatcher#addSubscriptionToChannel(org.apache.activeblaze.Subscription) + */ + @Override + protected void addSubscriptionToChannel(Subscription s) throws JMSException { + try { + this.connection.channel.addBlazeTopicMessageListener(s, this); + } catch (Exception e) { + throw BlazeJmsExceptionSupport.create(e); + } + } + + /** + * @param s + * @see org.apache.activeblaze.jms.BlazeMessageDispatcher#removeSubscriptionToChannel(org.apache.activeblaze.Subscription) + */ + @Override + protected void removeSubscriptionToChannel(Subscription s) throws JMSException { + try { + this.connection.channel.removeBlazeTopicMessageListener(s); + } catch (Exception e) { + throw BlazeJmsExceptionSupport.create(e); + } + } + + /** + * @param jmsMsg + * @see org.apache.activeblaze.BlazeMessageListener#onMessage(org.apache.activeblaze.BlazeMessage) + */ + public void processMessage(BlazeJmsMessage jmsMsg) { + Destination destination = jmsMsg.getDestination(); + synchronized (this.subscriptionMap) { + for (Map.Entry> entry : this.subscriptionMap.entrySet()) { + if (entry.getKey().matches(destination)) { + List list = entry.getValue(); + for (BlazeJmsConsumer c : list) { + c.onMessage(jmsMsg.clone()); + } + } + } + } + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeTopicMessageDispatcher.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeTopicMessageDispatcher.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java?rev=741060&r1=741059&r2=741060&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java Thu Feb 5 09:44:01 2009 @@ -29,6 +29,7 @@ import org.apache.activeblaze.BlazeException; import org.apache.activeblaze.BlazeRuntimeException; import org.apache.activeblaze.jms.BlazeJmsExceptionSupport; +import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType; import org.apache.activeblaze.wire.BlazeData; import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.protobuf.BufferInputStream; @@ -100,6 +101,14 @@ copy.bytesOut = null; copy.dataIn = null; } + + /** + * @return the type + * @see org.apache.activeblaze.BlazeMessage#getType() + */ + public int getType(){ + return JmsMessageType.BYTES.ordinal(); + } public void storeContent() { super.storeContent(); Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java?rev=741060&r1=741059&r2=741060&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java Thu Feb 5 09:44:01 2009 @@ -27,6 +27,7 @@ import javax.jms.MessageNotWriteableException; import org.apache.activeblaze.BlazeException; import org.apache.activeblaze.BlazeRuntimeException; +import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType; import org.apache.activeblaze.wire.BlazeData; import org.apache.activeblaze.wire.MapData; import org.apache.activemq.protobuf.Buffer; @@ -98,6 +99,14 @@ storeContent(); super.copy(copy); } + + /** + * @return the type + * @see org.apache.activeblaze.BlazeMessage#getType() + */ + public int getType(){ + return JmsMessageType.MAP.ordinal(); + } public void storeContent() { super.storeContent(); Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java?rev=741060&r1=741059&r2=741060&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java Thu Feb 5 09:44:01 2009 @@ -35,10 +35,16 @@ * */ public class BlazeJmsMessage extends BlazeMessage implements Message { + + protected enum JmsMessageType{MESSAGE,BYTES,MAP,OBJECT,STREAM,TEXT} protected transient Callback acknowledgeCallback; protected transient BlazeJmsDestination jmsDestination; protected transient BlazeJmsDestination jmsReplyToDestination; + /** + * @return clone of a BlazeMessage + * @see org.apache.activeblaze.BlazeMessage#clone() + */ public BlazeJmsMessage clone() { BlazeJmsMessage copy = new BlazeJmsMessage(); try { @@ -55,6 +61,14 @@ copy.jmsDestination = this.jmsDestination; copy.jmsReplyToDestination = this.jmsReplyToDestination; } + + /** + * @return the type + * @see org.apache.activeblaze.BlazeMessage#getType() + */ + public int getType(){ + return JmsMessageType.MESSAGE.ordinal(); + } /** * @return the acknowledge Callback @@ -261,12 +275,12 @@ } /** - * @return + * @return the message type * @throws JMSException * @see javax.jms.Message#getJMSType() */ public String getJMSType() throws JMSException { - return getType(); + return getMessageType(); } /** @@ -470,7 +484,7 @@ * @see javax.jms.Message#setJMSType(java.lang.String) */ public void setJMSType(String type) { - setType(type); + setMessageType(type); } /** Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java (from r739885, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java) URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java&r1=739885&r2=741060&rev=741060&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java Thu Feb 5 09:44:01 2009 @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activeblaze.jms; +package org.apache.activeblaze.jms.message; import java.util.Enumeration; import javax.jms.BytesMessage; @@ -26,15 +26,11 @@ import javax.jms.ObjectMessage; import javax.jms.StreamMessage; import javax.jms.TextMessage; -import org.apache.activeblaze.jms.message.BlazeJmsBytesMessage; -import org.apache.activeblaze.jms.message.BlazeJmsMapMessage; -import org.apache.activeblaze.jms.message.BlazeJmsMessage; -import org.apache.activeblaze.jms.message.BlazeJmsObjectMessage; -import org.apache.activeblaze.jms.message.BlazeJmsStreamMessage; -import org.apache.activeblaze.jms.message.BlazeJmsTextMessage; - +import org.apache.activeblaze.BlazeMessage; +import org.apache.activeblaze.jms.BlazeJmsDestination; /** - * A helper class for converting normal JMS interfaces into ActiveMQ specific ones. + * A helper class for converting normal JMS interfaces into ActiveMQ specific + * ones. * * @version $Revision: 1.1 $ */ @@ -45,12 +41,37 @@ /** * @param dest * @return a BlazeJmsDestination - * @throws JMSException + * @throws JMSException */ private static BlazeJmsDestination transformDestination(Destination dest) throws JMSException { return BlazeJmsDestination.transform(dest); } - + + /** + * @param message + * @return a BlazeJmsMessage + * @throws JMSException + */ + public static BlazeJmsMessage transformMessage(BlazeMessage message) throws JMSException { + BlazeJmsMessage result = null; + int type = message.getType(); + if (type == BlazeJmsMessage.JmsMessageType.BYTES.ordinal()) { + result = new BlazeJmsBytesMessage(); + } else if (type == BlazeJmsMessage.JmsMessageType.MAP.ordinal()) { + result = new BlazeJmsMapMessage(); + } else if (type == BlazeJmsMessage.JmsMessageType.OBJECT.ordinal()) { + result = new BlazeJmsObjectMessage(); + } else if (type == BlazeJmsMessage.JmsMessageType.STREAM.ordinal()) { + result = new BlazeJmsStreamMessage(); + } else if (type == BlazeJmsMessage.JmsMessageType.TEXT.ordinal()) { + result = new BlazeJmsTextMessage(); + } else { + result = new BlazeJmsMessage(); + } + result.setContent(message.getContent()); + return result; + } + /** * @param message * @return a BlazeJmsDestination @@ -118,7 +139,8 @@ } /** - * Copies the standard JMS and user defined properties from the givem message to the specified message + * Copies the standard JMS and user defined properties from the givem + * message to the specified message * * @param fromMessage * the message to take the properties from @@ -144,6 +166,4 @@ toMessage.setObjectProperty(name, obj); } } - - } Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java ------------------------------------------------------------------------------ svn:executable = * Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java?rev=741060&r1=741059&r2=741060&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java Thu Feb 5 09:44:01 2009 @@ -27,6 +27,7 @@ import org.apache.activeblaze.BlazeException; import org.apache.activeblaze.BlazeRuntimeException; import org.apache.activeblaze.jms.BlazeJmsExceptionSupport; +import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType; import org.apache.activeblaze.util.ClassLoadingAwareObjectInputStream; import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.protobuf.BufferInputStream; @@ -75,6 +76,14 @@ super.copy(copy); copy.object = null; } + + /** + * @return the type + * @see org.apache.activeblaze.BlazeMessage#getType() + */ + public int getType(){ + return JmsMessageType.OBJECT.ordinal(); + } public void storeContent() { super.storeContent(); Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java?rev=741060&r1=741059&r2=741060&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java Thu Feb 5 09:44:01 2009 @@ -30,6 +30,7 @@ import org.apache.activeblaze.BlazeException; import org.apache.activeblaze.BlazeRuntimeException; import org.apache.activeblaze.jms.BlazeJmsExceptionSupport; +import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType; import org.apache.activeblaze.wire.BlazeData; import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.protobuf.BufferInputStream; @@ -119,6 +120,14 @@ copy.bytesOut = null; copy.dataIn = null; } + + /** + * @return the type + * @see org.apache.activeblaze.BlazeMessage#getType() + */ + public int getType(){ + return JmsMessageType.STREAM.ordinal(); + } public void storeContent() { super.storeContent(); Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java?rev=741060&r1=741059&r2=741060&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java Thu Feb 5 09:44:01 2009 @@ -23,6 +23,7 @@ import javax.jms.TextMessage; import org.apache.activeblaze.BlazeException; import org.apache.activeblaze.BlazeRuntimeException; +import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType; import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.protobuf.BufferInputStream; import org.apache.activemq.protobuf.BufferOutputStream; @@ -53,6 +54,14 @@ super.copy(copy); copy.text = this.text; } + + /** + * @return the type + * @see org.apache.activeblaze.BlazeMessage#getType() + */ + public int getType(){ + return JmsMessageType.TEXT.ordinal(); + } public void setText(String text) { this.text = text; Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java?rev=741060&r1=741059&r2=741060&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java Thu Feb 5 09:44:01 2009 @@ -22,8 +22,6 @@ import java.io.ObjectOutput; import java.util.HashMap; import java.util.Map; -import java.util.Properties; - import javax.naming.NamingException; import javax.naming.Reference; import javax.naming.Referenceable; Modified: activemq/activemq-blaze/trunk/src/main/proto/blaze.proto URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/proto/blaze.proto?rev=741060&r1=741059&r2=741060&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/proto/blaze.proto (original) +++ activemq/activemq-blaze/trunk/src/main/proto/blaze.proto Thu Feb 5 09:44:01 2009 @@ -55,7 +55,7 @@ message AckData { //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType"; //| option java_type_method = "MessageType"; - required int64 id =1; + optional int64 id =1; optional int64 startSequence =2; optional int64 endSequence =3; optional int64 sessionId = 4; @@ -66,7 +66,7 @@ message NackData { //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType"; //| option java_type_method = "MessageType"; - required int64 id =1; + optional int64 id =1; optional int64 startSequence =2; optional int64 endSequence =3; optional int64 sessionId = 4; @@ -75,7 +75,7 @@ message ControlData { //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType"; //| option java_type_method = "MessageType"; - required int64 lastId =1; //last ack or nack id + optional int64 lastId =1; //last ack or nack id } message DestinationData { @@ -85,12 +85,13 @@ } message SubscriptionData { - optional bool durable =1; - optional int32 weight = 2; - optional string channelName =3; - optional string subscriberName =4; - optional string selector =5; - optional DestinationData destinationData =6; + optional bool durable = 1; + optional bool noLocal = 2; + optional int32 weight = 3; + optional string channelName = 4; + optional string subscriberName = 5; + optional string selector = 6; + optional DestinationData destinationData = 7; } message MemberData { @@ -241,15 +242,16 @@ optional bool persistent = 1; optional int32 priority = 2; optional int32 redeliveryCounter = 3; - optional int64 timestamp = 4; - optional int64 expiration = 5; - optional bytes messageId = 6; - optional bytes correlationId = 7; - optional bytes fromId =8; - optional bytes type = 9; - optional bytes payload = 10; - optional DestinationData destinationData = 11; - optional DestinationData replyToData = 12; + optional int32 type =4; + optional int64 timestamp = 5; + optional int64 expiration = 6; + optional bytes messageId = 7; + optional bytes correlationId = 8; + optional bytes fromId =9; + optional bytes messageType = 10; + optional bytes payload = 11; + optional DestinationData destinationData = 12; + optional DestinationData replyToData = 13; optional MapData mapData = 14; optional bytes payload = 15; Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfConsumer.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfConsumer.java?rev=741060&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfConsumer.java (added) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfConsumer.java Thu Feb 5 09:44:01 2009 @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms.perf; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.Topic; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * @version $Revision: 1.3 $ + */ +public class PerfConsumer implements MessageListener { + private static final Log LOG = LogFactory.getLog(PerfConsumer.class); + protected Connection connection; + protected MessageConsumer consumer; + protected long sleepDuration; + protected long initialDelay; + protected boolean firstMessage = true; + protected PerfRate rate = new PerfRate(); + + public PerfConsumer(ConnectionFactory fac, Destination dest, String consumerName) throws JMSException { + connection = fac.createConnection(); + connection.setClientID(consumerName!=null ? consumerName:"Consumer"); + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + if (dest instanceof Topic && consumerName != null && consumerName.length() > 0) { + consumer = s.createDurableSubscriber((Topic) dest, consumerName); + } else { + consumer = s.createConsumer(dest); + } + consumer.setMessageListener(this); + } + + public PerfConsumer(ConnectionFactory fac, Destination dest) throws JMSException { + this(fac, dest, null); + } + + public void start() throws JMSException { + connection.start(); + rate.reset(); + } + + public void stop() throws JMSException { + connection.stop(); + } + + public void shutDown() throws JMSException { + connection.close(); + } + + public PerfRate getRate() { + return rate; + } + + public void onMessage(Message msg) { + if (firstMessage) { + firstMessage = false; + if (getInitialDelay() > 0) { + try { + Thread.sleep(getInitialDelay()); + } catch (InterruptedException e) { + } + } + } + rate.increment(); + try { + if (sleepDuration != 0) { + Thread.sleep(sleepDuration); + } + } catch (InterruptedException e) { + } + } + + public synchronized long getSleepDuration() { + return sleepDuration; + } + + public synchronized void setSleepDuration(long sleepDuration) { + this.sleepDuration = sleepDuration; + } + + /** + * @return the initialDelay + */ + public long getInitialDelay() { + return initialDelay; + } + + /** + * @param initialDelay + * the initialDelay to set + */ + public void setInitialDelay(long initialDelay) { + this.initialDelay = initialDelay; + } +} Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfConsumer.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfConsumer.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfProducer.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfProducer.java?rev=741060&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfProducer.java (added) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfProducer.java Thu Feb 5 09:44:01 2009 @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms.perf; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +/** + * @version $Revision: 1.3 $ + */ +public class PerfProducer implements Runnable { + protected Connection connection; + protected MessageProducer producer; + protected PerfRate rate = new PerfRate(); + private byte[] payload; + private Session session; + private final CountDownLatch stopped = new CountDownLatch(1); + private boolean running; + private int sleep = 0; + + public PerfProducer(ConnectionFactory fac, Destination dest, byte[] palyload) throws JMSException { + connection = fac.createConnection(); + connection.setClientID("Producer"); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(dest); + this.payload = palyload; + } + + public void setDeliveryMode(int mode) throws JMSException { + producer.setDeliveryMode(mode); + } + + public void setTimeToLive(int ttl) throws JMSException { + producer.setTimeToLive(ttl); + } + + public void shutDown() throws JMSException { + connection.close(); + } + + public PerfRate getRate() { + return rate; + } + + public synchronized void start() throws JMSException { + if (!running) { + rate.reset(); + running = true; + connection.start(); + Thread t = new Thread(this); + t.setName("Producer"); + t.start(); + } + } + + public void stop() throws JMSException, InterruptedException { + synchronized (this) { + running = false; + } + stopped.await(1,TimeUnit.SECONDS); + connection.stop(); + } + + public synchronized boolean isRunning() { + return running; + } + + public void run() { + try { + while (isRunning()) { + BytesMessage msg; + msg = session.createBytesMessage(); + msg.writeBytes(payload); + producer.send(msg); + rate.increment(); + if (sleep > 0) { + Thread.sleep(sleep); + } + } + } catch (Throwable e) { + e.printStackTrace(); + } finally { + stopped.countDown(); + } + } + + public int getSleep() { + return sleep; + } + + public void setSleep(int sleep) { + this.sleep = sleep; + } + +} Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfProducer.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfProducer.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfRate.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfRate.java?rev=741060&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfRate.java (added) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfRate.java Thu Feb 5 09:44:01 2009 @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms.perf; + +/** + * @version $Revision: 1.3 $ + */ +public class PerfRate { + + protected int totalCount; + protected int count; + protected long startTime = System.currentTimeMillis(); + + /** + * @return Returns the count. + */ + public int getCount() { + return totalCount; + } + + public synchronized void increment() { + totalCount++; + count++; + } + + public int getRate() { + long endTime = System.currentTimeMillis(); + long totalTime = endTime - startTime; + int result = (int)((count * 1000) / totalTime); + return result; + } + + /** + * Resets the rate sampling. + */ + public synchronized PerfRate cloneAndReset() { + PerfRate rc = new PerfRate(); + rc.totalCount = totalCount; + rc.count = count; + rc.startTime = startTime; + count = 0; + startTime = System.currentTimeMillis(); + return rc; + } + + /** + * Resets the rate sampling. + */ + public void reset() { + count = 0; + startTime = System.currentTimeMillis(); + } + + /** + * @return Returns the totalCount. + */ + public int getTotalCount() { + return totalCount; + } + + /** + * @param totalCount The totalCount to set. + */ + public void setTotalCount(int totalCount) { + this.totalCount = totalCount; + } +} Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfRate.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfRate.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/SimpleTopicTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/SimpleTopicTest.java?rev=741060&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/SimpleTopicTest.java (added) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/SimpleTopicTest.java Thu Feb 5 09:44:01 2009 @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms.perf; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; +import junit.framework.TestCase; +import org.apache.activeblaze.jms.BlazeJmsConnectionFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * @version $Revision: 1.3 $ + */ +public class SimpleTopicTest extends TestCase { + + private static final Log LOG = LogFactory.getLog(SimpleTopicTest.class); + + + protected PerfProducer[] producers; + protected PerfConsumer[] consumers; + protected String destinationName = getClass().getName(); + protected int sampleCount = 20; + protected long sampleInternal = 10000; + protected int numberOfDestinations=1; + protected int numberOfConsumers = 1; + protected int numberofProducers = 1; + protected int totalNumberOfProducers; + protected int totalNumberOfConsumers; + protected int playloadSize = 12; + protected byte[] array; + protected ConnectionFactory factory; + + /** + * Sets up a test where the producer and consumer have their own connection. + * + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + + factory = createConnectionFactory(); + Connection con = factory.createConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + LOG.info("Running " + numberofProducers + " producer(s) and " + numberOfConsumers + " consumer(s) per " + numberOfDestinations + " Destination(s)"); + + totalNumberOfConsumers=numberOfConsumers*numberOfDestinations; + totalNumberOfProducers=numberofProducers*numberOfDestinations; + producers = new PerfProducer[totalNumberOfProducers]; + consumers = new PerfConsumer[totalNumberOfConsumers]; + int consumerCount = 0; + int producerCount = 0; + for (int k =0; k < numberOfDestinations;k++) { + Destination destination = createDestination(session, destinationName+":"+k); + LOG.info("Testing against destination: " + destination); + for (int i = 0; i < numberOfConsumers; i++) { + consumers[consumerCount] = createConsumer(factory, destination, consumerCount); + consumerCount++; + } + for (int i = 0; i < numberofProducers; i++) { + array = new byte[playloadSize]; + for (int j = i; j < array.length; j++) { + array[j] = (byte)j; + } + producers[producerCount] = createProducer(factory, destination, i, array); + producerCount++; + } + } + con.close(); + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + for (int i = 0; i < numberOfConsumers; i++) { + + } + for (int i = 0; i < numberofProducers; i++) { + producers[i].shutDown(); + } + + } + + protected Destination createDestination(Session s, String destinationName) throws JMSException { + return s.createTopic(destinationName); + } + + /** + * Factory method to create a new broker + * + * @throws Exception + */ + + + protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte[] payload) throws JMSException { + return new PerfProducer(fac, dest, payload); + } + + protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException { + return new PerfConsumer(fac, dest); + } + + + + + protected ConnectionFactory createConnectionFactory() throws Exception { + int portNum = 61616; + String uri = "static://("; + int count = this.numberofProducers + this.numberOfConsumers; + for (int i = 0; i < count;i++) { + uri += "udp://localhost:" + (portNum++); + uri += ","; + } + uri += ")"; + BlazeJmsConnectionFactory result = new BlazeJmsConnectionFactory(); + result.getConfiguration().setReliableBroadcast("swp"); + result.getConfiguration().setManagementURI(""); + result.getConfiguration().setBroadcastURI(uri); + return result; + } + + public void testPerformance() throws JMSException, InterruptedException { + for (int i = 0; i < totalNumberOfConsumers; i++) { + consumers[i].start(); + } + for (int i = 0; i < totalNumberOfProducers; i++) { + producers[i].start(); + } + LOG.info("Sampling performance " + sampleCount + " times at a " + sampleInternal + " ms interval."); + for (int i = 0; i < sampleCount; i++) { + Thread.sleep(sampleInternal); + dumpProducerRate(); + dumpConsumerRate(); + } + for (int i = 0; i < totalNumberOfProducers; i++) { + producers[i].stop(); + } + for (int i = 0; i < totalNumberOfConsumers; i++) { + consumers[i].stop(); + } + } + + protected void dumpProducerRate() { + int totalRate = 0; + int totalCount = 0; + String producerString="Producers:"; + for (int i = 0; i < producers.length; i++) { + PerfRate rate = producers[i].getRate().cloneAndReset(); + totalRate += rate.getRate(); + totalCount += rate.getTotalCount(); + producerString+="["+i+":"+rate.getRate() + ","+rate.getTotalCount()+"];"; + } + if (producers != null && producers.length > 0) { + int avgRate = totalRate / producers.length; + System.out.println("Avg producer rate = " + avgRate + + " msg/sec | Total rate = " + totalRate + ", sent = " + + totalCount); + // System.out.println(producerString); + } + } + + protected void dumpConsumerRate() { + int totalRate = 0; + int totalCount = 0; + String consumerString="Consumers:"; + for (int i = 0; i < consumers.length; i++) { + PerfRate rate = consumers[i].getRate().cloneAndReset(); + totalRate += rate.getRate(); + totalCount += rate.getTotalCount(); + consumerString+="["+i+":"+rate.getRate() + ","+rate.getTotalCount()+"];"; + } + if (consumers != null && consumers.length > 0) { + int avgRate = totalRate / consumers.length; + System.out.println("Avg consumer rate = " + avgRate + " msg/sec | Total rate = " + totalRate + ", received = " + totalCount); + System.out.println(consumerString); + } + } +} Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/SimpleTopicTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/SimpleTopicTest.java ------------------------------------------------------------------------------ svn:mime-type = text/plain