From commits-return-10393-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Thu Mar 12 18:59:51 2009 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 80399 invoked from network); 12 Mar 2009 18:59:51 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 12 Mar 2009 18:59:51 -0000 Received: (qmail 63725 invoked by uid 500); 12 Mar 2009 18:59:51 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 63705 invoked by uid 500); 12 Mar 2009 18:59:51 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 63691 invoked by uid 99); 12 Mar 2009 18:59:51 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Mar 2009 11:59:51 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Mar 2009 18:59:35 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id D2E0623889B7; Thu, 12 Mar 2009 18:59:13 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r752973 [1/2] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ test/java/org/apache/activemq/broker/ test/java/org/apache/activemq/... Date: Thu, 12 Mar 2009 18:59:13 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090312185913.D2E0623889B7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Thu Mar 12 18:59:12 2009 New Revision: 752973 URL: http://svn.apache.org/viewvc?rev=752973&view=rev Log: Refactoring the MockBroker so that it's is more agnostic of the wire protocol used. Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/MockBrokerTest.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenWireSupport.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java?rev=752973&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java Thu Mar 12 18:59:12 2009 @@ -0,0 +1,219 @@ +package org.apache.activemq; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.broker.DeliveryTarget; +import org.apache.activemq.broker.MessageDelivery; +import org.apache.activemq.dispatch.IDispatcher; +import org.apache.activemq.flow.Flow; +import org.apache.activemq.flow.IFlowLimiter; +import org.apache.activemq.flow.IFlowRelay; +import org.apache.activemq.flow.IFlowSink; +import org.apache.activemq.flow.Message; +import org.apache.activemq.flow.SizeLimiter; +import org.apache.activemq.queue.SingleFlowRelay; +import org.apache.activemq.transport.DispatchableTransport; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportListener; + +abstract public class Connection implements TransportListener, DeliveryTarget { + + protected Transport transport; + + protected String name; + + private int priorityLevels; + + protected final int outputWindowSize = 1000; + protected final int outputResumeThreshold = 900; + + protected final int inputWindowSize = 1000; + protected final int inputResumeThreshold = 500; + + protected IFlowRelay outputQueue; + + private IDispatcher dispatcher; + private final AtomicBoolean stopping = new AtomicBoolean(); + protected Flow outputFlow; + protected boolean blockingTransport = false; + protected ExecutorService blockingWriter; + + public void setTransport(Transport transport) { + this.transport = transport; + } + + public void start() throws Exception { + transport.setTransportListener(this); + transport.start(); + } + + public void stop() throws Exception { + stopping.set(true); + if (transport != null) { + transport.stop(); + } + if (blockingWriter != null) { + blockingWriter.shutdown(); + } + } + + protected void initialize() { + } + + protected final void write(final Object o) { + synchronized (outputQueue) { + if (!blockingTransport) { + try { + transport.oneway(o); + } catch (IOException e) { + onException(e); + } + } else { + try { + blockingWriter.execute(new Runnable() { + public void run() { + if (!stopping.get()) { + try { + transport.oneway(o); + } catch (IOException e) { + onException(e); + } + } + } + }); + } catch (RejectedExecutionException re) { + //Must be shutting down. + } + } + } + } + + public void onException(IOException error) { + onException((Exception) error); + } + + public void onException(Exception error) { + if (!isStopping()) { + System.out.println("RemoteConnection error: " + error); + error.printStackTrace(); + } + } + + public boolean isStopping(){ + return stopping.get(); + } + + public void transportInterupted() { + } + + public void transportResumed() { + } + + public String getName() { + return name; + } + + public int getPriorityLevels() { + return priorityLevels; + } + + public void setPriorityLevels(int priorityLevels) { + this.priorityLevels = priorityLevels; + } + + public IDispatcher getDispatcher() { + return dispatcher; + } + + public void setDispatcher(IDispatcher dispatcher) { + this.dispatcher = dispatcher; + if (transport instanceof DispatchableTransport) { + DispatchableTransport dt = ((DispatchableTransport) transport); + if (name != null) { + dt.setName(name); + } + dt.setDispatcher(getDispatcher()); + } + } + + public int getOutputWindowSize() { + return outputWindowSize; + } + + public int getOutputResumeThreshold() { + return outputResumeThreshold; + } + + public int getInputWindowSize() { + return inputWindowSize; + } + + public int getInputResumeThreshold() { + return inputResumeThreshold; + } + + public IFlowRelay getSink() { + return outputQueue; + } + + public boolean match(MessageDelivery message) { + return true; + } + + protected interface ProtocolLimiter extends IFlowLimiter { + public void onProtocolCredit(int credit); + } + + protected class WindowLimiter extends SizeLimiter implements ProtocolLimiter { + final Flow flow; + final boolean clientMode; + private int available; + + public WindowLimiter(boolean clientMode, Flow flow, int capacity, int resumeThreshold) { + super(capacity, resumeThreshold); + this.clientMode = clientMode; + this.flow = flow; + } + + public void reserve(E elem) { + super.reserve(elem); +// if (!clientMode) { +// System.out.println(name + " Reserved " + this); +// } + } + + public void releaseReserved(E elem) { + super.reserve(elem); +// if (!clientMode) { +// System.out.println(name + " Released Reserved " + this); +// } + } + + protected void remove(int size) { + super.remove(size); + if (!clientMode) { + available += size; + if (available >= capacity - resumeThreshold) { + sendCredit(available); + available = 0; + } + } + } + + protected void sendCredit(int credit) { + throw new UnsupportedOperationException("Please override this method to provide and implemenation."); + } + + public void onProtocolCredit(int credit) { + remove(credit); + } + + public int getElementSize(MessageDelivery m) { + return m.getFlowLimiterSize(); + } + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java?rev=752973&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java Thu Mar 12 18:59:12 2009 @@ -0,0 +1,124 @@ +package org.apache.activemq.broker; + +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.Connection; +import org.apache.activemq.broker.openwire.OpenwireBrokerConnection; +import org.apache.activemq.dispatch.IDispatcher; +import org.apache.activemq.transport.DispatchableTransportServer; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportAcceptListener; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportServer; + +public class Broker implements TransportAcceptListener { + + public static final int MAX_USER_PRIORITY = 10; + public static final int MAX_PRIORITY = MAX_USER_PRIORITY + 1; + + final Router router = new Router(); + + final ArrayList clientConnections = new ArrayList(); + final HashMap queues = new HashMap(); + + private TransportServer transportServer; + private String uri; + private String name; + private IDispatcher dispatcher; + private final AtomicBoolean stopping = new AtomicBoolean(); + + public String getName() { + return name; + } + + + public void addQueue(Queue queue) { + Domain domain = router.getDomain(queue.getDestination().getDomain()); + domain.add(queue.getDestination().getName(), queue); + } + + public final void stop() throws Exception { + stopping.set(true); + transportServer.stop(); + + for (Connection connection : clientConnections) { + connection.stop(); + } + for (Queue queue : queues.values()) { + queue.stop(); + } + dispatcher.shutdown(); + + } + + public final void start() throws Exception { + + dispatcher.start(); + + transportServer = TransportFactory.bind(new URI(uri)); + transportServer.setAcceptListener(this); + if (transportServer instanceof DispatchableTransportServer) { + ((DispatchableTransportServer) transportServer).setDispatcher(dispatcher); + } + transportServer.start(); + + for (Queue queue : queues.values()) { + queue.start(); + } + } + + public void onAccept(final Transport transport) { + OpenwireBrokerConnection connection = new OpenwireBrokerConnection(); + connection.setBroker(this); + connection.setTransport(transport); + connection.setPriorityLevels(MAX_PRIORITY); + connection.setDispatcher(dispatcher); + clientConnections.add(connection); + try { + connection.start(); + } catch (Exception e1) { + onAcceptError(e1); + } + } + + public void onAcceptError(Exception error) { + System.out.println("Accept error: " + error); + error.printStackTrace(); + } + + public IDispatcher getDispatcher() { + return dispatcher; + } + + public void setName(String name) { + this.name = name; + } + + public void setDispatcher(IDispatcher dispatcher) { + this.dispatcher = dispatcher; + } + + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + public URI getConnectURI() { + return transportServer.getConnectURI(); + } + + public boolean isStopping() { + return stopping.get(); + } + + public Router getRouter() { + return router; + } + +} \ No newline at end of file Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java?rev=752973&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java Thu Mar 12 18:59:12 2009 @@ -0,0 +1,23 @@ +package org.apache.activemq.broker; + +import org.apache.activemq.Connection; + +abstract public class BrokerConnection extends Connection { + + protected Broker broker; + + public Broker getBroker() { + return broker; + } + + public void setBroker(Broker broker) { + this.broker = broker; + } + + @Override + public boolean isStopping() { + return super.isStopping() || broker.isStopping(); + } + + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java?rev=752973&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java Thu Mar 12 18:59:12 2009 @@ -0,0 +1,14 @@ +/** + * + */ +package org.apache.activemq.broker; + +import org.apache.activemq.flow.IFlowSink; + +public interface DeliveryTarget { + + public IFlowSink getSink(); + + public boolean match(MessageDelivery message); + +} \ No newline at end of file Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java?rev=752973&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java Thu Mar 12 18:59:12 2009 @@ -0,0 +1,84 @@ +package org.apache.activemq.broker; + +import java.util.Collection; + +import org.apache.activemq.protobuf.AsciiBuffer; + +public interface Destination { + + AsciiBuffer getDomain(); + AsciiBuffer getName(); + Collection getDestinations(); + + public class SingleDestination implements Destination { + + private AsciiBuffer domain; + private AsciiBuffer name; + + public SingleDestination() { + } + public SingleDestination(AsciiBuffer domain, AsciiBuffer name) { + setDomain(domain); + setName(name); + } + public SingleDestination(String domain, String name) { + setDomain(domain); + setName(name); + } + + public Collection getDestinations() { + return null; + } + + public AsciiBuffer getDomain() { + return domain; + } + + public AsciiBuffer getName() { + return name; + } + public void setDomain(AsciiBuffer domain) { + this.domain = domain; + } + public void setName(AsciiBuffer name) { + this.name = name; + } + + private void setName(String name) { + setName(new AsciiBuffer(name)); + } + private void setDomain(String domain) { + setDomain(new AsciiBuffer(domain)); + } + } + + public class MultiDestination implements Destination { + + private Collection destinations; + + public MultiDestination() { + } + + public MultiDestination(Collection destinations) { + this.destinations=destinations; + } + + public Collection getDestinations() { + return destinations; + } + + public void setDestinations(Collection destinations) { + this.destinations = destinations; + } + + public AsciiBuffer getDomain() { + return null; + } + + public AsciiBuffer getName() { + return null; + } + + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java?rev=752973&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java Thu Mar 12 18:59:12 2009 @@ -0,0 +1,23 @@ +package org.apache.activemq.broker; + +import java.util.Collection; + +import org.apache.activemq.protobuf.AsciiBuffer; + +/** + * Represents a messaging domain like pub/sub or point to point in JMS terms or an Exchange in + * AMQP terms. + * + * @author chirino + */ +public interface Domain { + + public void add(AsciiBuffer name, Object value); + + public Object remove(AsciiBuffer name); + + public void bind(AsciiBuffer name, DeliveryTarget dt); + + public Collection route(MessageDelivery msg); + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=752973&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Thu Mar 12 18:59:12 2009 @@ -0,0 +1,22 @@ +package org.apache.activemq.broker; + +import org.apache.activemq.protobuf.AsciiBuffer; + +public interface MessageDelivery { + + public Destination getDestination(); + + public int getPriority(); + + public int getFlowLimiterSize(); + + public AsciiBuffer getMsgId(); + + public AsciiBuffer getProducerId(); + + public void setCompletionCallback(Runnable runnable); + public Runnable getCompletionCallback(); + + public T asType(Class type); + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java?rev=752973&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java Thu Mar 12 18:59:12 2009 @@ -0,0 +1,159 @@ +/** + * + */ +package org.apache.activemq.broker; + +import java.util.HashMap; + +import org.apache.activemq.flow.IFlowSink; +import org.apache.activemq.flow.ISourceController; +import org.apache.activemq.flow.PrioritySizeLimiter; +import org.apache.activemq.flow.SizeLimiter; +import org.apache.activemq.protobuf.AsciiBuffer; +import org.apache.activemq.queue.IQueue; +import org.apache.activemq.queue.Mapper; +import org.apache.activemq.queue.PartitionedQueue; +import org.apache.activemq.queue.SharedPriorityQueue; +import org.apache.activemq.queue.SharedQueue; +import org.apache.activemq.queue.Subscription; + +public class Queue implements DeliveryTarget { + + HashMap> subs = new HashMap>(); + private Destination destination; + private IQueue queue; + private Broker broker; + + private Mapper partitionMapper; + private Mapper keyExtractor; + + private IQueue createQueue() { + + if (partitionMapper!=null) { + PartitionedQueue queue = new PartitionedQueue() { + @Override + protected IQueue cratePartition(Integer partitionKey) { + return createSharedFlowQueue(); + } + }; + queue.setPartitionMapper(partitionMapper); + queue.setResourceName(destination.getName().toString()); + return queue; + } else { + return createSharedFlowQueue(); + } + } + + + public static final Mapper PRIORITY_MAPPER = new Mapper() { + public Integer map(MessageDelivery element) { + return element.getPriority(); + } + }; + + private IQueue createSharedFlowQueue() { + if (Broker.MAX_PRIORITY > 1) { + PrioritySizeLimiter limiter = new PrioritySizeLimiter(100, 1, Broker.MAX_PRIORITY); + limiter.setPriorityMapper(PRIORITY_MAPPER); + SharedPriorityQueue queue = new SharedPriorityQueue(destination.getName().toString(), limiter); + queue.setKeyMapper(keyExtractor); + queue.setAutoRelease(true); + queue.setDispatcher(broker.getDispatcher()); + return queue; + } else { + SizeLimiter limiter = new SizeLimiter(100, 1); + SharedQueue queue = new SharedQueue(destination.getName().toString(), limiter); + queue.setKeyMapper(keyExtractor); + queue.setAutoRelease(true); + queue.setDispatcher(broker.getDispatcher()); + return queue; + } + } + + public final void deliver(ISourceController source, MessageDelivery msg) { + queue.add(msg, source); + } + + public final Destination getDestination() { + return destination; + } + + public final void addConsumer(final DeliveryTarget dt) { + Subscription sub = new Subscription() { + public boolean isPreAcquired() { + return true; + } + + public boolean matches(MessageDelivery message) { + return dt.match(message); + } + + public boolean isRemoveOnDispatch() { + return true; + } + + public IFlowSink getSink() { + return dt.getSink(); + } + + @Override + public String toString() { + return getSink().toString(); + } + }; + subs.put(dt, sub); + queue.addSubscription(sub); + } + + public boolean removeSubscirption(final DeliveryTarget dt) { + Subscription sub = subs.remove(dt); + if (sub != null) { + return queue.removeSubscription(sub); + } + return false; + } + + public void start() throws Exception { + queue = createQueue(); + } + + public void stop() throws Exception { + } + + public IFlowSink getSink() { + return queue; + } + + public boolean match(MessageDelivery message) { + return true; + } + + public Broker getBroker() { + return broker; + } + + public void setBroker(Broker broker) { + this.broker = broker; + } + + public Mapper getPartitionMapper() { + return partitionMapper; + } + + public void setPartitionMapper(Mapper partitionMapper) { + this.partitionMapper = partitionMapper; + } + + public Mapper getKeyExtractor() { + return keyExtractor; + } + + public void setKeyExtractor(Mapper keyExtractor) { + this.keyExtractor = keyExtractor; + } + + public void setDestination(Destination destination) { + this.destination = destination; + } + +} \ No newline at end of file Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java?rev=752973&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java Thu Mar 12 18:59:12 2009 @@ -0,0 +1,34 @@ +package org.apache.activemq.broker; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; + +import org.apache.activemq.protobuf.AsciiBuffer; + +public class QueueDomain implements Domain { + + final HashMap queues = new HashMap(); + + public void add(AsciiBuffer name, Object queue) { + queues.put(name, (Queue)queue); + } + public Object remove(AsciiBuffer name) { + return queues.remove(name); + } + + public void bind(AsciiBuffer name, DeliveryTarget deliveryTarget) { + queues.get(name).addConsumer(deliveryTarget); + } + + public Collection route(MessageDelivery name) { + Queue queue = queues.get(name); + if( queue!=null ) { + ArrayList rc = new ArrayList(1); + rc.add(queue); + return rc; + } + return null; + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java?rev=752973&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java Thu Mar 12 18:59:12 2009 @@ -0,0 +1,46 @@ +/** + * + */ +package org.apache.activemq.broker; + +import java.util.Collection; +import java.util.HashMap; + +import org.apache.activemq.protobuf.AsciiBuffer; + +final public class Router { + + public static final AsciiBuffer TOPIC_DOMAIN = new AsciiBuffer("topic"); + public static final AsciiBuffer QUEUE_DOMAIN = new AsciiBuffer("queue"); + + private final HashMap domains = new HashMap(); + + public Router() { + domains.put(QUEUE_DOMAIN, new QueueDomain()); + domains.put(TOPIC_DOMAIN, new TopicDomain()); + } + + public Domain getDomain(AsciiBuffer name) { + return domains.get(name); + } + + public Domain putDomain(AsciiBuffer name, Domain domain) { + return domains.put(name, domain); + } + + public Domain removeDomain(Object name) { + return domains.remove(name); + } + + + public synchronized void bind(Destination destination, DeliveryTarget dt) { + Domain domain = domains.get(destination.getDomain()); + domain.bind(destination.getName(), dt); + } + + public Collection route(MessageDelivery msg) { + Domain domain = domains.get(msg.getDestination().getDomain()); + return domain.route(msg); + } + +} \ No newline at end of file Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java?rev=752973&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java Thu Mar 12 18:59:12 2009 @@ -0,0 +1,32 @@ +package org.apache.activemq.broker; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; + +import org.apache.activemq.protobuf.AsciiBuffer; + +public class TopicDomain implements Domain { + + final HashMap> topicsTargets = new HashMap>(); + + public void add(AsciiBuffer name, Object queue) { + } + public Object remove(AsciiBuffer name) { + return null; + } + + public void bind(AsciiBuffer name, DeliveryTarget target) { + ArrayList targets = topicsTargets.get(name); + if (targets == null) { + targets = new ArrayList(); + topicsTargets.put(name, targets); + } + targets.add(target); + } + + public Collection route(MessageDelivery name) { + return topicsTargets.get(name); + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=752973&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Thu Mar 12 18:59:12 2009 @@ -0,0 +1,68 @@ +package org.apache.activemq.broker.openwire; + +import org.apache.activemq.broker.Destination; +import org.apache.activemq.broker.MessageDelivery; +import org.apache.activemq.command.Message; +import org.apache.activemq.protobuf.AsciiBuffer; + +public class OpenWireMessageDelivery implements MessageDelivery { + + private final Message message; + private Destination destination; + private AsciiBuffer producerId; + private Runnable completionCallback; + + public OpenWireMessageDelivery(Message message) { + this.message = message; + } + + public Destination getDestination() { + if( destination == null ) { + destination = OpenwireBrokerConnection.convert(message.getDestination()); + } + return destination; + } + + public int getFlowLimiterSize() { + return message.getSize(); + } + + public int getPriority() { + return message.getPriority(); + } + + public AsciiBuffer getMsgId() { + return null; + } + + public AsciiBuffer getProducerId() { + if( producerId == null ) { + producerId = new AsciiBuffer(message.getProducerId().toString()); + } + return producerId; + } + + public Message getMessage() { + return message; + } + + public Runnable getCompletionCallback() { + return completionCallback; + } + + public void setCompletionCallback(Runnable completionCallback) { + this.completionCallback = completionCallback; + } + + public T asType(Class type) { + if( type == Message.class ) { + return type.cast(message); + } + // TODO: is this right? + if( message.getClass().isAssignableFrom(type) ) { + return type.cast(message); + } + return null; + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java?rev=752973&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java Thu Mar 12 18:59:12 2009 @@ -0,0 +1,472 @@ +package org.apache.activemq.broker.openwire; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; + +import org.apache.activemq.broker.BrokerConnection; +import org.apache.activemq.broker.DeliveryTarget; +import org.apache.activemq.broker.Destination; +import org.apache.activemq.broker.MessageDelivery; +import org.apache.activemq.broker.Router; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.BrokerId; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.ConnectionControl; +import org.apache.activemq.command.ConnectionError; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerControl; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.ControlCommand; +import org.apache.activemq.command.DestinationInfo; +import org.apache.activemq.command.ExceptionResponse; +import org.apache.activemq.command.FlushCommand; +import org.apache.activemq.command.KeepAliveInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageDispatchNotification; +import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.ProducerAck; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveSubscriptionInfo; +import org.apache.activemq.command.Response; +import org.apache.activemq.command.SessionId; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.command.TransactionInfo; +import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.filter.BooleanExpression; +import org.apache.activemq.filter.LogicExpression; +import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.filter.NoLocalExpression; +import org.apache.activemq.flow.Flow; +import org.apache.activemq.flow.FlowController; +import org.apache.activemq.flow.IFlowController; +import org.apache.activemq.flow.IFlowSink; +import org.apache.activemq.flow.IFlowSource; +import org.apache.activemq.flow.ISourceController; +import org.apache.activemq.flow.SizeLimiter; +import org.apache.activemq.flow.ISinkController.FlowControllable; +import org.apache.activemq.protobuf.AsciiBuffer; +import org.apache.activemq.selector.SelectorParser; +import org.apache.activemq.state.CommandVisitor; + +public class OpenwireBrokerConnection extends BrokerConnection { + + protected final HashMap producers = new HashMap(); + protected final HashMap consumers = new HashMap(); + + protected final Object inboundMutex = new Object(); + protected IFlowController inboundController; + + protected IFlowController outboundController; +// public ProtocolLimiter outboundLimiter; + protected Flow ouboundFlow; + + public void onCommand(Object o) { + final Command command = (Command) o; + boolean responseRequired = command.isResponseRequired(); + try { + command.visit(new CommandVisitor() { + + /////////////////////////////////////////////////////////////////// + // Methods that keep track of the client state + /////////////////////////////////////////////////////////////////// + public Response processAddConnection(ConnectionInfo info) throws Exception { + return ack(command); + } + public Response processAddSession(SessionInfo info) throws Exception { + return ack(command); + } + public Response processAddProducer(ProducerInfo info) throws Exception { + producers.put(info.getProducerId(), new ProducerContext(info)); + return ack(command); + } + public Response processAddConsumer(ConsumerInfo info) throws Exception { + ConsumerContext ctx = new ConsumerContext(info); + consumers.put(info.getConsumerId(), ctx); + + broker.getRouter().bind(convert(info.getDestination()), ctx); + + + return ack(command); + } + public Response processRemoveConnection(ConnectionId info, long arg1) throws Exception { + return ack(command); + } + public Response processRemoveSession(SessionId info, long arg1) throws Exception { + return ack(command); + } + public Response processRemoveProducer(ProducerId info) throws Exception { + producers.remove(info); + return ack(command); + } + public Response processRemoveConsumer(ConsumerId info, long arg1) throws Exception { + return ack(command); + } + + /////////////////////////////////////////////////////////////////// + // Message Processing Methods. + /////////////////////////////////////////////////////////////////// + public Response processMessage(Message info) throws Exception { + ProducerId producerId = info.getProducerId(); + ProducerContext producerContext = producers.get(producerId); + + OpenWireMessageDelivery md = new OpenWireMessageDelivery(info); + + // Only producers that are not using a window will block, and if it blocks. + // yes we block the connection's read thread. yes other sessions will not get + // serviced while we block here. The producer is depending on TCP flow + // control to slow him down so we have to stop ready from the socket at this + // point. + while( !producerContext.controller.offer(md, null) ) { + producerContext.controller.waitForFlowUnblock(); + } + return null; + } + public Response processMessageAck(MessageAck info) throws Exception { + return ack(command); + } + public Response processMessagePull(MessagePull info) throws Exception { + return ack(command); + } + public Response processProducerAck(ProducerAck info) throws Exception { + return ack(command); + } + + /////////////////////////////////////////////////////////////////// + // Control Methods + /////////////////////////////////////////////////////////////////// + public Response processWireFormat(WireFormatInfo info) throws Exception { + return ack(command); + } + public Response processShutdown(ShutdownInfo info) throws Exception { + return ack(command); + } + public Response processKeepAlive(KeepAliveInfo info) throws Exception { + return ack(command); + } + public Response processFlush(FlushCommand info) throws Exception { + return ack(command); + } + public Response processConnectionControl(ConnectionControl info) throws Exception { + return ack(command); + } + public Response processConnectionError(ConnectionError info) throws Exception { + return ack(command); + } + public Response processConsumerControl(ConsumerControl info) throws Exception { + return ack(command); + } + + /////////////////////////////////////////////////////////////////// + // Methods for server management + /////////////////////////////////////////////////////////////////// + public Response processAddDestination(DestinationInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + public Response processRemoveDestination(DestinationInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + public Response processControlCommand(ControlCommand info) throws Exception { + throw new UnsupportedOperationException(); + } + + /////////////////////////////////////////////////////////////////// + // Methods for transaction management + /////////////////////////////////////////////////////////////////// + public Response processBeginTransaction(TransactionInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + public Response processEndTransaction(TransactionInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + public Response processForgetTransaction(TransactionInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + public Response processPrepareTransaction(TransactionInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + public Response processRecoverTransactions(TransactionInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + public Response processRollbackTransaction(TransactionInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + + /////////////////////////////////////////////////////////////////// + // Methods for cluster operations + /////////////////////////////////////////////////////////////////// + public Response processBrokerInfo(BrokerInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + public Response processMessageDispatch(MessageDispatch info) throws Exception { + throw new UnsupportedOperationException(); + } + public Response processMessageDispatchNotification(MessageDispatchNotification info) throws Exception { + throw new UnsupportedOperationException(); + } + }); + } catch (Exception e) { + if (responseRequired) { + ExceptionResponse response = new ExceptionResponse(e); + response.setCorrelationId(command.getCommandId()); + write(response); + } else { + onException(e); + } + + } + } + + + /////////////////////////////////////////////////////////////////// + // Internal Support Methods + /////////////////////////////////////////////////////////////////// + + private Response ack(Command command) { + Response rc = null; + if( command.isResponseRequired() ) { + rc = new Response(); + rc.setCorrelationId(command.getCommandId()); + } + return rc; + } + + @Override + public void start() throws Exception { + super.start(); + BrokerInfo info = new BrokerInfo(); + info.setBrokerId(new BrokerId(broker.getName())); + info.setBrokerName(broker.getName()); + info.setBrokerURL(broker.getConnectURI().toString()); + write(info); + } + + static class FlowControllableAdapter implements FlowControllable { + public void flowElemAccepted(ISourceController controller, MessageDelivery elem) { + } + + public IFlowSink getFlowSink() { + return null; + } + + public IFlowSource getFlowSource() { + return null; + } + } + + + class ConsumerContext implements DeliveryTarget { + + private final ConsumerInfo info; + private String name; + private BooleanExpression selector; + + public ConsumerContext(final ConsumerInfo info) throws InvalidSelectorException { + this.info = info; + this.name = info.getConsumerId().toString(); + selector = parseSelector(info); + } + + public IFlowSink getSink() { + // TODO Auto-generated method stub + return null; + } + + public boolean match(MessageDelivery message) { + Message msg = message.asType(Message.class); + if( msg ==null ) { + return false; + } + + MessageEvaluationContext selectorContext = new MessageEvaluationContext(); + selectorContext.setMessageReference(msg); + selectorContext.setDestination(info.getDestination()); + try { + return (selector == null || selector.matches(selectorContext)); + } catch (JMSException e) { + e.printStackTrace(); + return false; + } + } + + } + + private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException { + BooleanExpression rc = null; + if (info.getSelector() != null) { + rc = SelectorParser.parse(info.getSelector()); + } + if (info.isNoLocal()) { + if (rc == null) { + rc = new NoLocalExpression(info.getConsumerId().getConnectionId()); + } else { + rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc); + } + } + if (info.getAdditionalPredicate() != null) { + if (rc == null) { + rc = info.getAdditionalPredicate(); + } else { + rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc); + } + } + return rc; + } + + class ProducerContext { + + private final ProducerInfo info; + private IFlowController controller; + private String name; + + public ProducerContext(final ProducerInfo info) { + this.info = info; + this.name = info.getProducerId().toString(); + + // Openwire only uses credit windows at the producer level for producers that request the feature. + if( info.getWindowSize() > 0 ) { + Flow flow = new Flow(info.getProducerId().toString(), false); + + WindowLimiter limiter = new WindowLimiter(false, flow, info.getWindowSize(), info.getWindowSize()/2) { + @Override + protected void sendCredit(int credit) { + ProducerAck ack = new ProducerAck(info.getProducerId(), credit); + write(ack); + } + }; + + controller = new FlowController(new FlowControllableAdapter() { + public void flowElemAccepted(ISourceController controller, MessageDelivery elem) { + route(controller, elem); + } + public String toString() { + return name; + } + }, flow, limiter, inboundMutex); + } else { + controller = inboundController; + } + } + } + + protected void route(ISourceController controller, MessageDelivery elem) { + // TODO: + // Consider doing some caching of this target list. Most producers always send to + // the same destination. + Collection targets = broker.getRouter().route(elem); + + final Message message = ((OpenWireMessageDelivery)elem).getMessage(); + if( targets != null ) { + + if( message.isResponseRequired() ) { + // We need to ack the message once we ensure we won't loose it. + // We know we won't loose it once it's persisted or delivered to a consumer + // Setup a callback to get notifed once one of those happens. + if( message.isPersistent() ) { + elem.setCompletionCallback(new Runnable(){ + public void run() { + ack(message); + } + }); + } else { + // Let the client know the broker got the message. + ack(message); + } + } + + // Deliver the message to all the targets.. + for (DeliveryTarget dt : targets) { + if (dt.match(elem)) { + dt.getSink().add(elem, controller); + } + } + + } else { + // Let the client know we got the message even though there + // were no valid targets to deliver the message to. + if( message.isResponseRequired() ) { + ack(message); + } + } + controller.elementDispatched(elem); + } + + protected void initialize() { + + // Setup the input processing.. + Flow flow = new Flow(name, false); + SizeLimiter limiter = new SizeLimiter(inputWindowSize, inputResumeThreshold); + inboundController = new FlowController(new FlowControllableAdapter() { + public void flowElemAccepted(ISourceController controller, MessageDelivery elem) { + route(controller, elem); + } + public String toString() { + return name; + } + }, flow, limiter, inboundMutex); + +// ouboundFlow = new Flow(name, false); +// outboundLimiter = new WindowLimiter(true, ouboundFlow, outputWindowSize, outputResumeThreshold); +// outputQueue = new SingleFlowRelay(ouboundFlow, name + "-outbound", outboundLimiter); +// outboundController = outputQueue.getFlowController(ouboundFlow); +// +// if (transport instanceof DispatchableTransport) { +// outputQueue.setDrain(new IFlowDrain() { +// +// public void drain(MessageDelivery message, ISourceController controller) { +// write(message); +// } +// }); +// +// } else { +// blockingTransport = true; +// blockingWriter = Executors.newSingleThreadExecutor(); +// outputQueue.setDrain(new IFlowDrain() { +// public void drain(final MessageDelivery message, ISourceController controller) { +// write(message); +// }; +// }); +// +// } + } + + static public Destination convert(ActiveMQDestination dest) { + if( dest.isComposite() ) { + ActiveMQDestination[] compositeDestinations = dest.getCompositeDestinations(); + ArrayList d= new ArrayList(); + for (int i = 0; i < compositeDestinations.length; i++) { + d.add(convert(compositeDestinations[i])); + } + return new Destination.MultiDestination(d); + } + AsciiBuffer domain; + if( dest.isQueue() ) { + domain = Router.QUEUE_DOMAIN; + } if( dest.isTopic() ) { + domain = Router.TOPIC_DOMAIN; + } else { + throw new IllegalArgumentException("Unsupported domain type: "+ dest); + } + return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName())); + } + +} Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/MockBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/MockBrokerTest.java?rev=752973&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/MockBrokerTest.java (added) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/MockBrokerTest.java Thu Mar 12 18:59:12 2009 @@ -0,0 +1,484 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.openwire; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicLong; + +import junit.framework.TestCase; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.Destination; +import org.apache.activemq.broker.MessageDelivery; +import org.apache.activemq.broker.Queue; +import org.apache.activemq.broker.Router; +import org.apache.activemq.dispatch.IDispatcher; +import org.apache.activemq.dispatch.PriorityDispatcher; +import org.apache.activemq.metric.MetricAggregator; +import org.apache.activemq.metric.Period; +import org.apache.activemq.protobuf.AsciiBuffer; +import org.apache.activemq.queue.Mapper; + +public class MockBrokerTest extends TestCase { + + protected static final int PERFORMANCE_SAMPLES = 3; + + protected static final int IO_WORK_AMOUNT = 0; + protected static final int FANIN_COUNT = 10; + protected static final int FANOUT_COUNT = 10; + + protected static final int PRIORITY_LEVELS = 10; + protected static final boolean USE_INPUT_QUEUES = true; + + // Set to put senders and consumers on separate brokers. + protected boolean multibroker = false; + + // Set to mockup up ptp: + protected boolean ptp = false; + + // Set to use tcp IO + protected boolean tcp = true; + // set to force marshalling even in the NON tcp case. + protected boolean forceMarshalling = false; + + protected String sendBrokerURI; + protected String receiveBrokerURI; + + // Set's the number of threads to use: + protected final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors(); + protected boolean usePartitionedQueue = false; + + protected int producerCount; + protected int consumerCount; + protected int destCount; + + protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items"); + protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items"); + + protected Broker sendBroker; + protected Broker rcvBroker; + protected ArrayList brokers = new ArrayList(); + protected IDispatcher dispatcher; + protected final AtomicLong msgIdGenerator = new AtomicLong(); + + final ArrayList producers = new ArrayList(); + final ArrayList consumers = new ArrayList(); + + static public final Mapper KEY_MAPPER = new Mapper() { + public AsciiBuffer map(MessageDelivery element) { + return element.getMsgId(); + } + }; + static public final Mapper PARTITION_MAPPER = new Mapper() { + public Integer map(MessageDelivery element) { + // we modulo 10 to have at most 10 partitions which the producers + // gets split across. + return (int) (element.getProducerId().hashCode() % 10); + } + }; + + @Override + protected void setUp() throws Exception { + dispatcher = createDispatcher(); + dispatcher.start(); + if (tcp) { + sendBrokerURI = "tcp://localhost:10000?wireFormat=proto2"; + receiveBrokerURI = "tcp://localhost:20000?wireFormat=proto2"; + } else { + if (forceMarshalling) { + sendBrokerURI = "pipe://SendBroker?wireFormat=proto"; + receiveBrokerURI = "pipe://ReceiveBroker?wireFormat=proto"; + } else { + sendBrokerURI = "pipe://SendBroker"; + receiveBrokerURI = "pipe://ReceiveBroker"; + } + } + } + + protected IDispatcher createDispatcher() { + return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY, asyncThreadPoolSize); + } + + public void test_10_10_10() throws Exception { + producerCount = 2; + destCount = 2; + consumerCount = 2; + + createConnections(); + + // Start 'em up. + startServices(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + public void test_1_1_0() throws Exception { + producerCount = 1; + destCount = 1; + + createConnections(); + + // Start 'em up. + startServices(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + public void test_1_1_1() throws Exception { + producerCount = 1; + destCount = 1; + consumerCount = 1; + + createConnections(); + + // Start 'em up. + startServices(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + public void test_10_1_10() throws Exception { + producerCount = FANIN_COUNT; + consumerCount = FANOUT_COUNT; + destCount = 1; + + createConnections(); + + // Start 'em up. + startServices(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + public void test_10_1_1() throws Exception { + producerCount = FANIN_COUNT; + destCount = 1; + consumerCount = 1; + + createConnections(); + + // Start 'em up. + startServices(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + public void test_1_1_10() throws Exception { + producerCount = 1; + destCount = 1; + consumerCount = FANOUT_COUNT; + + createConnections(); + + // Start 'em up. + startServices(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + public void test_2_2_2() throws Exception { + producerCount = 2; + destCount = 2; + consumerCount = 2; + + createConnections(); + + // Start 'em up. + startServices(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + /** + * Tests 2 producers sending to 1 destination with 2 consumres, but with + * consumers set to select only messages from each producer. 1 consumers is + * set to slow, the other producer should be able to send quickly. + * + * @throws Exception + */ + public void test_2_2_2_SlowConsumer() throws Exception { + producerCount = 2; + destCount = 2; + consumerCount = 2; + + createConnections(); + consumers.get(0).setThinkTime(50); + + // Start 'em up. + startServices(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + public void test_2_2_2_Selector() throws Exception { + producerCount = 2; + destCount = 2; + consumerCount = 2; + + createConnections(); + + // Add properties to match producers to their consumers + for (int i = 0; i < consumerCount; i++) { + String property = "match" + i; + consumers.get(i).setSelector(property); + producers.get(i).setProperty(property); + } + + // Start 'em up. + startServices(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + /** + * Test sending with 1 high priority sender. The high priority sender should + * have higher throughput than the other low priority senders. + * + * @throws Exception + */ + public void test_2_1_1_HighPriorityProducer() throws Exception { + + producerCount = 2; + destCount = 1; + consumerCount = 1; + + createConnections(); + RemoteProducer producer = producers.get(0); + producer.setPriority(1); + producer.getRate().setName("High Priority Producer Rate"); + + consumers.get(0).setThinkTime(1); + + // Start 'em up. + startServices(); + try { + + System.out.println("Checking rates for test: " + getName()); + for (int i = 0; i < PERFORMANCE_SAMPLES; i++) { + Period p = new Period(); + Thread.sleep(1000 * 5); + System.out.println(producer.getRate().getRateSummary(p)); + System.out.println(totalProducerRate.getRateSummary(p)); + System.out.println(totalConsumerRate.getRateSummary(p)); + totalProducerRate.reset(); + totalConsumerRate.reset(); + } + + } finally { + stopServices(); + } + } + + /** + * Test sending with 1 high priority sender. The high priority sender should + * have higher throughput than the other low priority senders. + * + * @throws Exception + */ + public void test_2_1_1_MixedHighPriorityProducer() throws Exception { + producerCount = 2; + destCount = 1; + consumerCount = 1; + + createConnections(); + RemoteProducer producer = producers.get(0); + producer.setPriority(1); + producer.setPriorityMod(3); + producer.getRate().setName("High Priority Producer Rate"); + + consumers.get(0).setThinkTime(1); + + // Start 'em up. + startServices(); + try { + + System.out.println("Checking rates for test: " + getName()); + for (int i = 0; i < PERFORMANCE_SAMPLES; i++) { + Period p = new Period(); + Thread.sleep(1000 * 5); + System.out.println(producer.getRate().getRateSummary(p)); + System.out.println(totalProducerRate.getRateSummary(p)); + System.out.println(totalConsumerRate.getRateSummary(p)); + totalProducerRate.reset(); + totalConsumerRate.reset(); + } + + } finally { + stopServices(); + } + } + + private void reportRates() throws InterruptedException { + System.out.println("Checking rates for test: " + getName() + ", " + (ptp ? "ptp" : "topic")); + for (int i = 0; i < PERFORMANCE_SAMPLES; i++) { + Period p = new Period(); + Thread.sleep(1000 * 5); + System.out.println(totalProducerRate.getRateSummary(p)); + System.out.println(totalConsumerRate.getRateSummary(p)); + totalProducerRate.reset(); + totalConsumerRate.reset(); + } + } + + private void createConnections() throws IOException, URISyntaxException { + + if (multibroker) { + sendBroker = createBroker("SendBroker", sendBrokerURI); + rcvBroker = createBroker("RcvBroker", receiveBrokerURI); + brokers.add(sendBroker); + brokers.add(rcvBroker); + } else { + sendBroker = rcvBroker = createBroker("Broker", sendBrokerURI); + brokers.add(sendBroker); + } + + Destination[] dests = new Destination[destCount]; + + for (int i = 0; i < destCount; i++) { + Destination.SingleDestination bean = new Destination.SingleDestination(); + bean.setName(new AsciiBuffer("dest" + (i + 1))); + bean.setDomain(ptp ? Router.QUEUE_DOMAIN : Router.TOPIC_DOMAIN); + dests[i] = bean; + if (ptp) { + Queue queue = createQueue(sendBroker, dests[i]); + sendBroker.addQueue(queue); + if (multibroker) { + queue = createQueue(rcvBroker, dests[i]); + rcvBroker.addQueue(queue); + } + } + } + + for (int i = 0; i < producerCount; i++) { + Destination destination = dests[i % destCount]; + RemoteProducer producer = createProducer(i, destination); + producers.add(producer); + } + + for (int i = 0; i < consumerCount; i++) { + Destination destination = dests[i % destCount]; + RemoteConsumer consumer = createConsumer(i, destination); + consumers.add(consumer); + } + + // Create MultiBroker connections: + // if (multibroker) { + // Pipe pipe = new Pipe(); + // sendBroker.createBrokerConnection(rcvBroker, pipe); + // rcvBroker.createBrokerConnection(sendBroker, pipe.connect()); + // } + } + + private RemoteConsumer createConsumer(int i, Destination destination) { + RemoteConsumer consumer = new RemoteConsumer(); + consumer.setUri(rcvBroker.getConnectURI()); + consumer.setDestination(destination); + consumer.setName("consumer" + (i + 1)); + consumer.setTotalConsumerRate(totalConsumerRate); + consumer.setDispatcher(dispatcher); + return consumer; + } + + private RemoteProducer createProducer(int id, Destination destination) { + RemoteProducer producer = new RemoteProducer(); + producer.setUri(sendBroker.getConnectURI()); + producer.setProducerId(id + 1); + producer.setName("producer" + (id + 1)); + producer.setDestination(destination); + producer.setMessageIdGenerator(msgIdGenerator); + producer.setTotalProducerRate(totalProducerRate); + producer.setDispatcher(dispatcher); + return producer; + } + + private Queue createQueue(Broker broker, Destination destination) { + Queue queue = new Queue(); + queue.setBroker(broker); + queue.setDestination(destination); + queue.setKeyExtractor(KEY_MAPPER); + if (usePartitionedQueue) { + queue.setPartitionMapper(PARTITION_MAPPER); + } + return queue; + } + + private Broker createBroker(String name, String uri) { + Broker broker = new Broker(); + broker.setName(name); + broker.setUri(uri); + broker.setDispatcher(dispatcher); + return broker; + } + + private void stopServices() throws Exception { + for (RemoteProducer connection : producers) { + connection.stop(); + } + for (RemoteConsumer connection : consumers) { + connection.stop(); + } + for (Broker broker : brokers) { + broker.stop(); + } + if (dispatcher != null) { + dispatcher.shutdown(); + } + } + + private void startServices() throws Exception { + for (Broker broker : brokers) { + broker.start(); + } + for (RemoteConsumer connection : consumers) { + connection.start(); + } + + for (RemoteProducer connection : producers) { + connection.start(); + } + } + +} Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenWireSupport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenWireSupport.java?rev=752973&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenWireSupport.java (added) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenWireSupport.java Thu Mar 12 18:59:12 2009 @@ -0,0 +1,83 @@ +package org.apache.activemq.broker.openwire; + +import javax.jms.MessageNotWriteableException; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveInfo; +import org.apache.activemq.command.SessionInfo; + +public class OpenWireSupport { + + static private long idGenerator; + static private int msgIdGenerator; + static private int txGenerator; + static private int tempDestGenerator; + + public static ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception { + ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator); + info.setBrowser(false); + info.setDestination(destination); + info.setPrefetchSize(1000); + info.setDispatchAsync(false); + return info; + } + + public static RemoveInfo closeConsumerInfo(ConsumerInfo consumerInfo) { + return consumerInfo.createRemoveCommand(); + } + + public static ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception { + ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator); + return info; + } + + public static SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception { + SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator); + return info; + } + + public static ConnectionInfo createConnectionInfo() throws Exception { + ConnectionInfo info = new ConnectionInfo(); + info.setConnectionId(new ConnectionId("connection:" + (++idGenerator))); + info.setClientId(info.getConnectionId().getValue()); + return info; + } + + public static ActiveMQTextMessage createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) { + return createMessage(producerInfo, destination, 4, null); + } + + public static ActiveMQTextMessage createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int priority, String payload) { + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setJMSPriority(priority); + message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator)); + message.setDestination(destination); + message.setPersistent(false); + if( payload!=null ) { + try { + message.setText(payload); + } catch (MessageNotWriteableException e) { + } + } + return message; + } + + public static MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int count, byte ackType) { + MessageAck ack = new MessageAck(); + ack.setAckType(ackType); + ack.setConsumerId(consumerInfo.getConsumerId()); + ack.setDestination(msg.getDestination()); + ack.setLastMessageId(msg.getMessageId()); + ack.setMessageCount(count); + return ack; + } + +} Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java?rev=752973&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java (added) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java Thu Mar 12 18:59:12 2009 @@ -0,0 +1,169 @@ +package org.apache.activemq.broker.openwire; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.Connection; +import org.apache.activemq.broker.Destination; +import org.apache.activemq.broker.MessageDelivery; +import org.apache.activemq.flow.Flow; +import org.apache.activemq.flow.FlowController; +import org.apache.activemq.flow.IFlowSink; +import org.apache.activemq.flow.IFlowSource; +import org.apache.activemq.flow.ISourceController; +import org.apache.activemq.flow.SizeLimiter; +import org.apache.activemq.flow.ISinkController.FlowControllable; +import org.apache.activemq.metric.MetricAggregator; +import org.apache.activemq.metric.MetricCounter; +import org.apache.activemq.transport.DispatchableTransport; +import org.apache.activemq.transport.TransportFactory; + +public class RemoteConsumer extends Connection { + + private final MetricCounter consumerRate = new MetricCounter(); + + private MetricAggregator totalConsumerRate; + private long thinkTime; + private Destination destination; + private String selector; + private URI uri; + + private boolean schedualWait; + + protected final Object inboundMutex = new Object(); + private FlowController inboundController; + + public void start() throws Exception { + consumerRate.name("Consumer " + name + " Rate"); + totalConsumerRate.add(consumerRate); + + transport = TransportFactory.compositeConnect(uri); + if(transport instanceof DispatchableTransport) + { + DispatchableTransport dt = ((DispatchableTransport)transport); + dt.setName(name + "-client-transport"); + dt.setDispatcher(getDispatcher()); + schedualWait = true; + } + transport.setTransportListener(this); + transport.start(); + + // Let the remote side know our name. + transport.oneway(name); + // Sending the destination acts as the subscribe. + transport.oneway(destination); + super.initialize(); + } + + protected void initialize() { + + // Setup the input processing.. + Flow flow = new Flow(name, false); + SizeLimiter limiter = new SizeLimiter(inputWindowSize, inputResumeThreshold); + inboundController = new FlowController(new FlowControllable() { + public void flowElemAccepted(ISourceController controller, MessageDelivery elem) { + messageReceived(controller, elem); + } + public String toString() { + return name; + } + public IFlowSink getFlowSink() { + return null; + } + public IFlowSource getFlowSource() { + return null; + } + }, flow, limiter, inboundMutex); + } + + public void onCommand(Object command) { + try { + if (command.getClass() == MessageDelivery.class) { + MessageDelivery msg = (MessageDelivery) command; + inboundController.add(msg, null); + } else { + onException(new Exception("Unrecognized command: " + command)); + } + } catch (Exception e) { + onException(e); + } + } + + protected void messageReceived(final ISourceController controller, final MessageDelivery elem) { + if( schedualWait ) { + if (thinkTime > 0) { + getDispatcher().schedule(new Runnable(){ + + public void run() { + consumerRate.increment(); + controller.elementDispatched(elem); + } + + }, thinkTime, TimeUnit.MILLISECONDS); + + } + else + { + consumerRate.increment(); + controller.elementDispatched(elem); + } + + } else { + if( thinkTime>0 ) { + try { + Thread.sleep(thinkTime); + } catch (InterruptedException e) { + } + } + consumerRate.increment(); + controller.elementDispatched(elem); + } + } + + public void setName(String name) { + this.name = name; + } + + public MetricAggregator getTotalConsumerRate() { + return totalConsumerRate; + } + + public void setTotalConsumerRate(MetricAggregator totalConsumerRate) { + this.totalConsumerRate = totalConsumerRate; + } + + public Destination getDestination() { + return destination; + } + + public void setDestination(Destination destination) { + this.destination = destination; + } + + public long getThinkTime() { + return thinkTime; + } + + public void setThinkTime(long thinkTime) { + this.thinkTime = thinkTime; + } + + public MetricCounter getConsumerRate() { + return consumerRate; + } + + public String getSelector() { + return selector; + } + + public void setSelector(String selector) { + this.selector = selector; + } + + public URI getUri() { + return uri; + } + + public void setUri(URI uri) { + this.uri = uri; + }}