activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r961114 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
Date Wed, 07 Jul 2010 03:59:19 GMT
Author: chirino
Date: Wed Jul  7 03:59:19 2010
New Revision: 961114

URL: http://svn.apache.org/viewvc?rev=961114&view=rev
Log:
Queue is now holding QueueEntry objects

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961114&r1=961113&r2=961114&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Wed Jul  7 03:59:19 2010
@@ -26,9 +26,17 @@ import org.fusesource.hawtbuf._
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 trait DeliveryProducer {
+
   def dispatchQueue:DispatchQueue
-  def collocate(queue:DispatchQueue):Unit
   def ack(message:Delivery) = {}
+
+  def collocate(value:DispatchQueue):Unit = {
+    if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) {
+      println(dispatchQueue.getLabel+" co-locating with: "+value.getLabel);
+      this.dispatchQueue.setTargetQueue(value.getTargetQueue)
+    }
+  }
+
 }
 
 /**

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961114&r1=961113&r2=961114&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Wed Jul  7 03:59:19 2010
@@ -17,9 +17,9 @@
 package org.apache.activemq.apollo.broker
 
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
 import collection.{SortedMap}
 import java.util.LinkedList
+import org.fusesource.hawtdispatch.{EventAggregators, DispatchQueue, BaseRetained}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -42,6 +42,10 @@ trait QueueLifecyleListener {
 
 }
 
+object QueueEntry extends Sizer[QueueEntry] {
+  def size(value:QueueEntry):Int = value.delivery.size
+}
+class QueueEntry(val seq:Long, val delivery:Delivery)
 
 object Queue extends Log {
   val maxOutboundSize = 1024*1204*5
@@ -59,63 +63,28 @@ class Queue(val destination:Destination,
   dispatchQueue {
     debug("created queue for: "+destination)
   }
+  setDisposer(^{
+    dispatchQueue.release
+    session_manager.release
+  })
 
   // sequence numbers.. used to track what's in the store.
   var first_seq = -1L
   var last_seq = -1L
   var message_seq_counter=0L
-  def next_message_seq = {
-    val rc = message_seq_counter
-    message_seq_counter += 1
-    rc
-  }
-
   var count = 0
 
-  val pending = new QueueSink[Delivery](Delivery) {
-    override def offer(delivery: Delivery) = {
-      val d = delivery.copy
-      d.ack = true
-      super.offer(d)
-    }
-  }
-  val session_manager = new SinkMux[Delivery](pending, dispatchQueue, Delivery)
-
-  pending.drainer = ^{ drain_delivery_buffer }
-
-  def drain_delivery_buffer: Unit = {
-    while (!readyConsumers.isEmpty && !pending.isEmpty) {
-      val cs = readyConsumers.removeFirst
-      val delivery = pending.poll
-      if( cs.session.offer(delivery) ) {
-        // consumer was not full.. keep him in the ready list
-        readyConsumers.addLast(cs)
-      } else {
-        // consumer full.
-        cs.ready = false
-        pending.unpoll(delivery)
-      }
-    }
-  }
+  val pending = new QueueSink[QueueEntry](QueueEntry)
+  val session_manager = new SinkMux[Delivery](MapSink(pending){ x=>accept(x) }, dispatchQueue,
Delivery)
+  pending.drainer = ^{ drain }
 
 
-  // Use an event source to coalesce cross thread synchronization.
-  val ack_source = createSource(new ListEventAggregator[Delivery](), dispatchQueue)
-  ack_source.setEventHandler(^{drain_acks});
-  ack_source.resume
-  def drain_acks = {
-    ack_source.getData.foreach { ack =>
-      pending.ack(ack)
-    }
-  }
-  override def ack(ack:Delivery) = {
-    ack_source.merge(ack)
-  }
-
-  setDisposer(^{
-    dispatchQueue.release
-    session_manager.release
-  })
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Implementation of the Route trait.  Allows consumers to bind/unbind
+  // from this queue so that it can send messages to them.
+  //
+  /////////////////////////////////////////////////////////////////////
 
   class ConsumerState(val session:Session) extends Runnable {
     session.refiller = this
@@ -126,7 +95,7 @@ class Queue(val destination:Destination,
       if( bound && !ready ) {
         ready = true
         readyConsumers.addLast(this)
-        drain_delivery_buffer
+        drain
       }
     }
   }
@@ -141,7 +110,7 @@ class Queue(val destination:Destination,
         allConsumers += consumer->cs
         readyConsumers.addLast(cs)
       }
-      drain_delivery_buffer
+      drain
     } >>: dispatchQueue
 
   def unbind(consumers:List[DeliveryConsumer]) = releasing(consumers) {
@@ -159,12 +128,14 @@ class Queue(val destination:Destination,
 
   def disconnected() = throw new RuntimeException("unsupported")
 
-  def collocate(value:DispatchQueue):Unit = {
-    if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) {
-      println(dispatchQueue.getLabel+" co-locating with: "+value.getLabel);
-      this.dispatchQueue.setTargetQueue(value.getTargetQueue)
-    }
-  }
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Implementation of the DeliveryConsumer trait.  Allows this queue
+  // to receive messages from producers.
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  def matches(message:Delivery) = { true }
 
   def connect(p:DeliveryProducer) = new Session {
     retain
@@ -185,10 +156,7 @@ class Queue(val destination:Destination,
       if( session.full ) {
         false
       } else {
-        if( value.ref !=null ) {
-          value.ref.retain
-        }
-        val rc = session.offer(value)
+        val rc = session.offer(sent(value))
         assert(rc, "session should accept since it was not full")
         true
       }
@@ -198,143 +166,76 @@ class Queue(val destination:Destination,
     def refiller_=(value:Runnable) = { session.refiller=value }
   }
 
-  def matches(message:Delivery) = { true }
-
-//  def open_session(producer_queue:DispatchQueue) = new ConsumerSession {
-//    val consumer = StompQueue.this
-//    val deliveryQueue = new DeliveryOverflowBuffer(delivery_buffer)
-//    retain
-//
-//    def deliver(delivery:Delivery) = using(delivery) {
-//      deliveryQueue.send(delivery)
-//    } >>: queue
-//
-//    def close = {
-//      release
-//    }
-//  }
 
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Implementation of the DeliveryProducer trait.
+  // It mainly deals with handling message acks from bound consumers.
+  //
+  /////////////////////////////////////////////////////////////////////
 
-}
+  val ack_source = createSource(EventAggregators.INTEGER_ADD, dispatchQueue)
+  ack_source.setEventHandler(^{drain_acks});
+  ack_source.resume
+  def drain_acks = {
+    pending.ack(ack_source.getData.intValue)
+  }
+  override def ack(ack:Delivery) = {
+    ack_source.merge(ack.size)
+  }
 
-class XQueue(val destination:Destination) {
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Implementation methods.
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  /**
+   * Called from the producer thread before the delivery is
+   * processed by the queues' thread.. therefore we don't
+   * yet know the order of the delivery in the queue.
+   */
+  private def sent(delivery:Delivery) = {
+    if( delivery.ref !=null ) {
+      delivery.ref.retain
+    }
+    delivery
+  }
+
+  /**
+   * Called from the queue thread.  At this point we
+   * know the order.  Converts the delivery to a QueueEntry
+   */
+  private def accept(delivery:Delivery) = {
+    val d = delivery.copy
+    d.ack = true
+    new QueueEntry(next_message_seq, d)
+  }
+
+  /**
+   * Dispatches as many messages to ready consumers
+   * as possible.
+   */
+  private def drain: Unit = {
+    while (!readyConsumers.isEmpty && !pending.isEmpty) {
+      val cs = readyConsumers.removeFirst
+      val queueEntry = pending.poll
+      if( cs.session.offer(queueEntry.delivery) ) {
+        // consumer was not full.. keep him in the ready list
+        readyConsumers.addLast(cs)
+      } else {
+        // consumer full.
+        cs.ready = false
+        pending.unpoll(queueEntry)
+      }
+    }
+  }
 
-// TODO:
-//    private VirtualHost virtualHost;
-//
-//    Queue() {
-//        this.queue = queue;
-//    }
-//
-//    /*
-//     * (non-Javadoc)
-//     *
-//     * @see
-//     * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
-//     * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
-//     */
-//    public void deliver(MessageDelivery message, ISourceController<?> source) {
-//        queue.add(message, source);
-//    }
-//
-//    public final void addSubscription(final Subscription<MessageDelivery> sub) {
-//        queue.addSubscription(sub);
-//    }
-//
-//    public boolean removeSubscription(final Subscription<MessageDelivery> sub) {
-//        return queue.removeSubscription(sub);
-//    }
-//
-//    public void start() throws Exception {
-//        queue.start();
-//    }
-//
-//    public void stop() throws Exception {
-//        if (queue != null) {
-//            queue.stop();
-//        }
-//    }
-//
-//    public void shutdown(Runnable onShutdown) throws Exception {
-//        if (queue != null) {
-//            queue.shutdown(onShutdown);
-//        }
-//    }
-//
-//    public boolean hasSelector() {
-//        return false;
-//    }
-//
-//    public boolean matches(MessageDelivery message) {
-//        return true;
-//    }
-//
-//    public VirtualHost getBroker() {
-//        return virtualHost;
-//    }
-//
-//    public void setVirtualHost(VirtualHost virtualHost) {
-//        this.virtualHost = virtualHost;
-//    }
-//
-//    public void setDestination(Destination destination) {
-//        this.destination = destination;
-//    }
-//
-//    public final Destination getDestination() {
-//        return destination;
-//    }
-//
-//    public boolean isDurable() {
-//        return true;
-//    }
-//
-//    public static class QueueSubscription implements BrokerSubscription {
-//        Subscription<MessageDelivery> subscription;
-//        final Queue queue;
-//
-//        public QueueSubscription(Queue queue) {
-//            this.queue = queue;
-//        }
-//
-//        /*
-//         * (non-Javadoc)
-//         *
-//         * @see
-//         * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.
-//         * activemq.broker.protocol.ProtocolHandler.ConsumerContext)
-//         */
-//        public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException
{
-//            this.subscription = subscription;
-//            queue.addSubscription(subscription);
-//        }
-//
-//        /*
-//         * (non-Javadoc)
-//         *
-//         * @see
-//         * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache
-//         * .activemq.broker.protocol.ProtocolHandler.ConsumerContext)
-//         */
-//        public void disconnect(ConsumerContext context) {
-//            queue.removeSubscription(subscription);
-//        }
-//
-//        /* (non-Javadoc)
-//         * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
-//         */
-//        public Destination getDestination() {
-//            return queue.getDestination();
-//        }
-//    }
-
-  // TODO:
-  def matches(message:Delivery) = false
-  def deliver(message:Delivery) = {
-    // TODO:
+  private def next_message_seq = {
+    val rc = message_seq_counter
+    message_seq_counter += 1
+    rc
   }
 
-  def getDestination() = destination
 
-  def shutdown = {}
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=961114&r1=961113&r2=961114&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Wed Jul  7 03:59:19 2010
@@ -329,12 +329,12 @@ class QueueSink[T](val sizer:Sizer[T], v
     }
   }
 
-  def ack(delivery:T) = {
+  def ack(amount:Int) = {
     // When a message is delivered to the consumer, we release
     // used capacity in the outbound queue, and can drain the inbound
     // queue
     val wasBlocking = full
-    size -= sizer.size(delivery)
+    size -= amount
     if( !isEmpty ) {
       drain
     } else {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961114&r1=961113&r2=961114&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Wed Jul  7 03:59:19 2010
@@ -209,16 +209,6 @@ class StompProtocolHandler extends Proto
 
           val producer = new DeliveryProducer() {
             override def dispatchQueue = queue
-            override def collocate(value:DispatchQueue):Unit = ^{
-//              TODO:
-//              if( value.getTargetQueue ne queue.getTargetQueue ) {
-//                println("sender on "+queue.getLabel+" co-locating with: "+value.getLabel);
-//                queue.setTargetQueue(value.getTargetQueue)
-//                write_source.setTargetQueue(queue);
-//                read_source.setTargetQueue(queue)
-//              }
-
-            } >>: queue
           }
 
           // don't process frames until we are connected..



Mime
View raw message