activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r961076 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-stomp/src/main/scala/org/apache/activemq/apoll...
Date Wed, 07 Jul 2010 03:45:00 GMT
Author: chirino
Date: Wed Jul  7 03:44:59 2010
New Revision: 961076

URL: http://svn.apache.org/viewvc?rev=961076&view=rev
Log:
simple 1-1-1 case seems to be working.. perf needs some work tho

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala?rev=961076&r1=961075&r2=961076&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala
Wed Jul  7 03:44:59 2010
@@ -211,6 +211,7 @@ class DeliveryProducerRoute(val destinat
 
   private def internal_bind(values:List[DeliveryConsumer]) = {
     values.foreach{ x=>
+      println("producer route attaching to conusmer.")
       targets = x.open_session(queue) :: targets
     }
   }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala?rev=961076&r1=961075&r2=961076&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
Wed Jul  7 03:44:59 2010
@@ -58,7 +58,7 @@ object DestinationParser {
             return new SingleDestination(Domain.TEMP_TOPIC_DOMAIN, name);
         } else {
             if( options.defaultDomain==null ) {
-                throw new IllegalArgumentException("Destination domain not provided: "+value);
+                return null;
             }
             return new SingleDestination(options.defaultDomain, value);
         }
@@ -81,7 +81,11 @@ object DestinationParser {
             var rc = value.split(compositeSeparator);
             var md = new MultiDestination();
             for (buffer <- rc) {
-                md.destinations ::= parse(buffer, options)
+              val d = parse(buffer, options)
+              if( d==null ) {
+                return null;
+              }
+              md.destinations = md.destinations ::: d :: Nil
             }
             return md;
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala?rev=961076&r1=961075&r2=961076&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
Wed Jul  7 03:44:59 2010
@@ -61,8 +61,8 @@ trait Logging {
         format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
       }
       val exception_id = next_exception_id
-      log.log.error(log_map(m)+" [id:"+exception_id+"]")
-      log.log.debug("[id:"+exception_id+"]", e)
+      log.log.error(log_map(m)+" (ref:"+exception_id+")")
+      log.log.debug("(ref:"+exception_id+")", e)
     }
   }
 
@@ -70,8 +70,8 @@ trait Logging {
     val l = log.log
     if( l.isErrorEnabled ) {
       val exception_id = next_exception_id
-      log.log.error(log_map(e.getMessage)+" [id:"+exception_id+"]")
-      log.log.debug("[id:"+exception_id+"]", e)
+      log.log.error(log_map(e.getMessage)+" (ref:"+exception_id+")")
+      log.log.debug("(ref:"+exception_id+")", e)
     }
   }
 
@@ -96,8 +96,8 @@ trait Logging {
         format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
       }
       val exception_id = next_exception_id
-      log.log.warn(log_map(m)+" [id:"+exception_id+"]")
-      log.log.debug("[id:"+exception_id+"]", e)
+      log.log.warn(log_map(m)+" (ref:"+exception_id+")")
+      log.log.debug("(ref:"+exception_id+")", e)
     }
   }
 
@@ -105,8 +105,8 @@ trait Logging {
     val l = log.log
     if( l.isWarnEnabled ) {
       val exception_id = next_exception_id
-      log.log.warn(log_map(e.getMessage)+" [id:"+exception_id+"]")
-      log.log.debug("[id:"+exception_id+"]", e)
+      log.log.warn(log_map(e.getMessage)+" (ref:"+exception_id+")")
+      log.log.debug("(ref:"+exception_id+")", e)
     }
   }
 
@@ -131,8 +131,8 @@ trait Logging {
         format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
       }
       val exception_id = next_exception_id
-      log.log.info(log_map(m)+" [id:"+exception_id+"]")
-      log.log.debug("[id:"+exception_id+"]", e)
+      log.log.info(log_map(m)+" (ref:"+exception_id+")")
+      log.log.debug("(ref:"+exception_id+")", e)
     }
   }
 
@@ -140,8 +140,8 @@ trait Logging {
     val l = log.log
     if( l.isInfoEnabled ) {
       val exception_id = next_exception_id
-      log.log.info(log_map(e.getMessage)+" [id:"+exception_id+"]")
-      log.log.debug("[id:"+exception_id+"]", e)
+      log.log.info(log_map(e.getMessage)+" (ref:"+exception_id+")")
+      log.log.debug("(ref:"+exception_id+")", e)
     }
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961076&r1=961075&r2=961076&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Wed Jul  7 03:44:59 2010
@@ -32,20 +32,19 @@ import org.apache.activemq.transport.Com
 import java.io.IOException
 
 
-class StompProtocolException(msg:String) extends Exception(msg)
-
 object StompConstants {
-  val QUEUE_PREFIX = new AsciiBuffer("/queue/")
-  val TOPIC_PREFIX = new AsciiBuffer("/topic/")
+
+  val options = new ParserOptions
+  options.queuePrefix = new AsciiBuffer("/queue/")
+  options.topicPrefix = new AsciiBuffer("/topic/")
+  options.defaultDomain = Domain.QUEUE_DOMAIN
 
   implicit def toDestination(value:AsciiBuffer):Destination = {
-    if( value.startsWith(QUEUE_PREFIX) ) {
-      new SingleDestination(Domain.QUEUE_DOMAIN, value.slice(QUEUE_PREFIX.length, -QUEUE_PREFIX.length))
-    } else if( value.startsWith(TOPIC_PREFIX) ) {
-      new SingleDestination(Domain.TOPIC_DOMAIN, value.slice(TOPIC_PREFIX.length, -TOPIC_PREFIX.length))
-    } else {
-      throw new StompProtocolException("Invalid stomp destiantion name: "+value);
+    val d = DestinationParser.parse(value, options)
+    if( d==null ) {
+      throw new ProtocolException("Invalid stomp destiantion name: "+value);
     }
+    d
   }
 
 }
@@ -60,8 +59,7 @@ class StompProtocolHandler extends Proto
   
   protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
 
-  class SimpleConsumer(val dest:AsciiBuffer) extends BaseRetained with DeliveryConsumer {
-
+  class SimpleConsumer(val destination:Destination) extends BaseRetained with DeliveryConsumer
{
 
     val queue = StompProtocolHandler.this.dispatchQueue
     val session_manager = new DeliverySessionManager(outboundChannel, queue)
@@ -80,7 +78,10 @@ class StompProtocolHandler extends Proto
       val consumer = SimpleConsumer.this
       retain
 
-      def deliver(delivery:Delivery) = session.send(delivery)
+      def deliver(delivery:Delivery) =  {
+//        info("Delivering to consumer session")
+        session.send(delivery)
+      }
 
       def close = {
         session.close
@@ -107,6 +108,7 @@ class StompProtocolHandler extends Proto
           connection.onFailure(e)
         }
       });
+      delivery = outboundChannel.receive
     }
   }
 
@@ -115,7 +117,6 @@ class StompProtocolHandler extends Proto
   override def onTransportConnected() = {
     connection.broker.runtime.getDefaultVirtualHost(
       queue.wrap { (host)=>
-        info("got host.. resuming")
         this.host=host
         connection.transport.resumeRead
       }
@@ -132,7 +133,7 @@ class StompProtocolHandler extends Proto
         producerRoute=null
       }
       if( consumer!=null ) {
-        host.router.unbind(consumer.dest, consumer::Nil)
+        host.router.unbind(consumer.destination, consumer::Nil)
         consumer=null
       }
     }
@@ -188,8 +189,9 @@ class StompProtocolHandler extends Proto
   def on_stomp_send(frame:StompFrame) = {
     get(frame.headers, Headers.Send.DESTINATION) match {
       case Some(dest)=>
+        val destiantion:Destination = dest
         // create the producer route...
-        if( producerRoute==null || producerRoute.destination!= dest ) {
+        if( producerRoute==null || producerRoute.destination!=destiantion ) {
 
           // clean up the previous producer..
           if( producerRoute!=null ) {
@@ -212,7 +214,7 @@ class StompProtocolHandler extends Proto
 
           // don't process frames until we are connected..
           connection.transport.suspendRead
-          host.router.connect(dest, queue, producer) {
+          host.router.connect(destiantion, queue, producer) {
             (route) =>
               connection.transport.resumeRead
               producerRoute = route
@@ -238,19 +240,22 @@ class StompProtocolHandler extends Proto
         consumer.deliver(delivery)
       })
       delivery.release;
+    } else {
+      // info("Dropping message.  No consumers interested in message.")
     }
   }
 
   def on_stomp_subscribe(headers:HeaderMap) = {
     get(headers, Headers.Subscribe.DESTINATION) match {
       case Some(dest)=>
+        val destiantion:Destination = dest
         if( consumer !=null ) {
           die("Only one subscription supported.")
 
         } else {
-          info("subscribing to: %s", dest)
-          consumer = new SimpleConsumer(dest);
-          host.router.bind(dest, consumer :: Nil)
+          info("subscribing to: %s", destiantion)
+          consumer = new SimpleConsumer(destiantion);
+          host.router.bind(destiantion, consumer :: Nil)
           consumer.release
         }
       case None=>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=961076&r1=961075&r2=961076&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
Wed Jul  7 03:44:59 2010
@@ -36,7 +36,7 @@ object StompLoadClient {
   import StompLoadClient._
   implicit def toAsciiBuffer(value: String) = new AsciiBuffer(value)
 
-  var producerSleep = 0;
+  var producerSleep = 1000*1000000;
   var consumerSleep = 0;
   var producers = 1;
   var consumers = 1;
@@ -265,7 +265,10 @@ object StompLoadClient {
           while (!done.get) {
             client.send(content)
             producerCounter.incrementAndGet();
-            Thread.sleep(producerSleep);
+            if(producerSleep > 0) {
+              client.flush
+              Thread.sleep(producerSleep);
+            }
             i += 1
           }
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=961076&r1=961075&r2=961076&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
Wed Jul  7 03:44:59 2010
@@ -118,7 +118,7 @@ public class MultiWireFormatFactory impl
 
                 public void setStartPos(int pos) {
                     if( session!=null ) {
-                        session.setEndPos(pos);
+                        session.setStartPos(pos);
                     } else {
                         start=pos;
                     }



Mime
View raw message