Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 38061 invoked from network); 7 Jul 2010 04:00:17 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 7 Jul 2010 04:00:17 -0000 Received: (qmail 39851 invoked by uid 500); 7 Jul 2010 04:00:17 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 39794 invoked by uid 500); 7 Jul 2010 04:00:16 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 39787 invoked by uid 99); 7 Jul 2010 04:00:16 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 04:00:16 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 04:00:13 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 813C623889DD; Wed, 7 Jul 2010 03:59:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r961114 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ Date: Wed, 07 Jul 2010 03:59:19 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100707035919.813C623889DD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Wed Jul 7 03:59:19 2010 New Revision: 961114 URL: http://svn.apache.org/viewvc?rev=961114&view=rev Log: Queue is now holding QueueEntry objects Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961114&r1=961113&r2=961114&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 03:59:19 2010 @@ -26,9 +26,17 @@ import org.fusesource.hawtbuf._ * @author Hiram Chirino */ trait DeliveryProducer { + def dispatchQueue:DispatchQueue - def collocate(queue:DispatchQueue):Unit def ack(message:Delivery) = {} + + def collocate(value:DispatchQueue):Unit = { + if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) { + println(dispatchQueue.getLabel+" co-locating with: "+value.getLabel); + this.dispatchQueue.setTargetQueue(value.getTargetQueue) + } + } + } /** Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961114&r1=961113&r2=961114&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 03:59:19 2010 @@ -17,9 +17,9 @@ package org.apache.activemq.apollo.broker import _root_.org.fusesource.hawtdispatch.ScalaDispatch._ -import org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained} import collection.{SortedMap} import java.util.LinkedList +import org.fusesource.hawtdispatch.{EventAggregators, DispatchQueue, BaseRetained} /** * @author Hiram Chirino @@ -42,6 +42,10 @@ trait QueueLifecyleListener { } +object QueueEntry extends Sizer[QueueEntry] { + def size(value:QueueEntry):Int = value.delivery.size +} +class QueueEntry(val seq:Long, val delivery:Delivery) object Queue extends Log { val maxOutboundSize = 1024*1204*5 @@ -59,63 +63,28 @@ class Queue(val destination:Destination, dispatchQueue { debug("created queue for: "+destination) } + setDisposer(^{ + dispatchQueue.release + session_manager.release + }) // sequence numbers.. used to track what's in the store. var first_seq = -1L var last_seq = -1L var message_seq_counter=0L - def next_message_seq = { - val rc = message_seq_counter - message_seq_counter += 1 - rc - } - var count = 0 - val pending = new QueueSink[Delivery](Delivery) { - override def offer(delivery: Delivery) = { - val d = delivery.copy - d.ack = true - super.offer(d) - } - } - val session_manager = new SinkMux[Delivery](pending, dispatchQueue, Delivery) - - pending.drainer = ^{ drain_delivery_buffer } - - def drain_delivery_buffer: Unit = { - while (!readyConsumers.isEmpty && !pending.isEmpty) { - val cs = readyConsumers.removeFirst - val delivery = pending.poll - if( cs.session.offer(delivery) ) { - // consumer was not full.. keep him in the ready list - readyConsumers.addLast(cs) - } else { - // consumer full. - cs.ready = false - pending.unpoll(delivery) - } - } - } + val pending = new QueueSink[QueueEntry](QueueEntry) + val session_manager = new SinkMux[Delivery](MapSink(pending){ x=>accept(x) }, dispatchQueue, Delivery) + pending.drainer = ^{ drain } - // Use an event source to coalesce cross thread synchronization. - val ack_source = createSource(new ListEventAggregator[Delivery](), dispatchQueue) - ack_source.setEventHandler(^{drain_acks}); - ack_source.resume - def drain_acks = { - ack_source.getData.foreach { ack => - pending.ack(ack) - } - } - override def ack(ack:Delivery) = { - ack_source.merge(ack) - } - - setDisposer(^{ - dispatchQueue.release - session_manager.release - }) + ///////////////////////////////////////////////////////////////////// + // + // Implementation of the Route trait. Allows consumers to bind/unbind + // from this queue so that it can send messages to them. + // + ///////////////////////////////////////////////////////////////////// class ConsumerState(val session:Session) extends Runnable { session.refiller = this @@ -126,7 +95,7 @@ class Queue(val destination:Destination, if( bound && !ready ) { ready = true readyConsumers.addLast(this) - drain_delivery_buffer + drain } } } @@ -141,7 +110,7 @@ class Queue(val destination:Destination, allConsumers += consumer->cs readyConsumers.addLast(cs) } - drain_delivery_buffer + drain } >>: dispatchQueue def unbind(consumers:List[DeliveryConsumer]) = releasing(consumers) { @@ -159,12 +128,14 @@ class Queue(val destination:Destination, def disconnected() = throw new RuntimeException("unsupported") - def collocate(value:DispatchQueue):Unit = { - if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) { - println(dispatchQueue.getLabel+" co-locating with: "+value.getLabel); - this.dispatchQueue.setTargetQueue(value.getTargetQueue) - } - } + ///////////////////////////////////////////////////////////////////// + // + // Implementation of the DeliveryConsumer trait. Allows this queue + // to receive messages from producers. + // + ///////////////////////////////////////////////////////////////////// + + def matches(message:Delivery) = { true } def connect(p:DeliveryProducer) = new Session { retain @@ -185,10 +156,7 @@ class Queue(val destination:Destination, if( session.full ) { false } else { - if( value.ref !=null ) { - value.ref.retain - } - val rc = session.offer(value) + val rc = session.offer(sent(value)) assert(rc, "session should accept since it was not full") true } @@ -198,143 +166,76 @@ class Queue(val destination:Destination, def refiller_=(value:Runnable) = { session.refiller=value } } - def matches(message:Delivery) = { true } - -// def open_session(producer_queue:DispatchQueue) = new ConsumerSession { -// val consumer = StompQueue.this -// val deliveryQueue = new DeliveryOverflowBuffer(delivery_buffer) -// retain -// -// def deliver(delivery:Delivery) = using(delivery) { -// deliveryQueue.send(delivery) -// } >>: queue -// -// def close = { -// release -// } -// } + ///////////////////////////////////////////////////////////////////// + // + // Implementation of the DeliveryProducer trait. + // It mainly deals with handling message acks from bound consumers. + // + ///////////////////////////////////////////////////////////////////// -} + val ack_source = createSource(EventAggregators.INTEGER_ADD, dispatchQueue) + ack_source.setEventHandler(^{drain_acks}); + ack_source.resume + def drain_acks = { + pending.ack(ack_source.getData.intValue) + } + override def ack(ack:Delivery) = { + ack_source.merge(ack.size) + } -class XQueue(val destination:Destination) { + ///////////////////////////////////////////////////////////////////// + // + // Implementation methods. + // + ///////////////////////////////////////////////////////////////////// + + /** + * Called from the producer thread before the delivery is + * processed by the queues' thread.. therefore we don't + * yet know the order of the delivery in the queue. + */ + private def sent(delivery:Delivery) = { + if( delivery.ref !=null ) { + delivery.ref.retain + } + delivery + } + + /** + * Called from the queue thread. At this point we + * know the order. Converts the delivery to a QueueEntry + */ + private def accept(delivery:Delivery) = { + val d = delivery.copy + d.ack = true + new QueueEntry(next_message_seq, d) + } + + /** + * Dispatches as many messages to ready consumers + * as possible. + */ + private def drain: Unit = { + while (!readyConsumers.isEmpty && !pending.isEmpty) { + val cs = readyConsumers.removeFirst + val queueEntry = pending.poll + if( cs.session.offer(queueEntry.delivery) ) { + // consumer was not full.. keep him in the ready list + readyConsumers.addLast(cs) + } else { + // consumer full. + cs.ready = false + pending.unpoll(queueEntry) + } + } + } -// TODO: -// private VirtualHost virtualHost; -// -// Queue() { -// this.queue = queue; -// } -// -// /* -// * (non-Javadoc) -// * -// * @see -// * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq -// * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController) -// */ -// public void deliver(MessageDelivery message, ISourceController source) { -// queue.add(message, source); -// } -// -// public final void addSubscription(final Subscription sub) { -// queue.addSubscription(sub); -// } -// -// public boolean removeSubscription(final Subscription sub) { -// return queue.removeSubscription(sub); -// } -// -// public void start() throws Exception { -// queue.start(); -// } -// -// public void stop() throws Exception { -// if (queue != null) { -// queue.stop(); -// } -// } -// -// public void shutdown(Runnable onShutdown) throws Exception { -// if (queue != null) { -// queue.shutdown(onShutdown); -// } -// } -// -// public boolean hasSelector() { -// return false; -// } -// -// public boolean matches(MessageDelivery message) { -// return true; -// } -// -// public VirtualHost getBroker() { -// return virtualHost; -// } -// -// public void setVirtualHost(VirtualHost virtualHost) { -// this.virtualHost = virtualHost; -// } -// -// public void setDestination(Destination destination) { -// this.destination = destination; -// } -// -// public final Destination getDestination() { -// return destination; -// } -// -// public boolean isDurable() { -// return true; -// } -// -// public static class QueueSubscription implements BrokerSubscription { -// Subscription subscription; -// final Queue queue; -// -// public QueueSubscription(Queue queue) { -// this.queue = queue; -// } -// -// /* -// * (non-Javadoc) -// * -// * @see -// * org.apache.activemq.broker.BrokerSubscription#connect(org.apache. -// * activemq.broker.protocol.ProtocolHandler.ConsumerContext) -// */ -// public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException { -// this.subscription = subscription; -// queue.addSubscription(subscription); -// } -// -// /* -// * (non-Javadoc) -// * -// * @see -// * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache -// * .activemq.broker.protocol.ProtocolHandler.ConsumerContext) -// */ -// public void disconnect(ConsumerContext context) { -// queue.removeSubscription(subscription); -// } -// -// /* (non-Javadoc) -// * @see org.apache.activemq.broker.BrokerSubscription#getDestination() -// */ -// public Destination getDestination() { -// return queue.getDestination(); -// } -// } - - // TODO: - def matches(message:Delivery) = false - def deliver(message:Delivery) = { - // TODO: + private def next_message_seq = { + val rc = message_seq_counter + message_seq_counter += 1 + rc } - def getDestination() = destination - def shutdown = {} } Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=961114&r1=961113&r2=961114&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Wed Jul 7 03:59:19 2010 @@ -329,12 +329,12 @@ class QueueSink[T](val sizer:Sizer[T], v } } - def ack(delivery:T) = { + def ack(amount:Int) = { // When a message is delivered to the consumer, we release // used capacity in the outbound queue, and can drain the inbound // queue val wasBlocking = full - size -= sizer.size(delivery) + size -= amount if( !isEmpty ) { drain } else { Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961114&r1=961113&r2=961114&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:59:19 2010 @@ -209,16 +209,6 @@ class StompProtocolHandler extends Proto val producer = new DeliveryProducer() { override def dispatchQueue = queue - override def collocate(value:DispatchQueue):Unit = ^{ -// TODO: -// if( value.getTargetQueue ne queue.getTargetQueue ) { -// println("sender on "+queue.getLabel+" co-locating with: "+value.getLabel); -// queue.setTargetQueue(value.getTargetQueue) -// write_source.setTargetQueue(queue); -// read_source.setTargetQueue(queue) -// } - - } >>: queue } // don't process frames until we are connected..