activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1086830 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/ apollo-broker/src/main/scala/org/apache/activemq/apollo/bro...
Date Wed, 30 Mar 2011 03:49:32 GMT
Author: chirino
Date: Wed Mar 30 03:49:31 2011
New Revision: 1086830

URL: http://svn.apache.org/viewvc?rev=1086830&view=rev
Log:
Simpler assertions and error logging.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1086830&r1=1086829&r2=1086830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
Wed Mar 30 03:49:31 2011
@@ -20,10 +20,10 @@ import _root_.java.io.{IOException}
 import _root_.java.lang.{String}
 import org.fusesource.hawtdispatch._
 import protocol.{ProtocolHandler}
-import org.apache.activemq.apollo.util.{Log, BaseService}
 import org.apache.activemq.apollo.filter.BooleanExpression
 import org.apache.activemq.apollo.transport.{TransportListener, DefaultTransportListener,
Transport}
 import org.apache.activemq.apollo.dto.{DestinationDTO, ConnectionStatusDTO}
+import org.apache.activemq.apollo.util.{Dispatched, Log, BaseService}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -32,7 +32,7 @@ object Connection extends Log
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-abstract class Connection() extends BaseService  {
+abstract class Connection() extends BaseService with Dispatched {
   import Connection._
 
   val dispatch_queue = createQueue()
@@ -99,20 +99,24 @@ class BrokerConnection(var connector: Co
   override def toString = "id: "+id.toString
 
   protected override  def _start(on_completed:Runnable) = {
-    connector.broker.connection_log.info("Client connected from: %s", transport.getRemoteAddress)
     protocol_handler.set_connection(this);
     super._start(on_completed)
   }
 
   protected override def _stop(on_completed:Runnable) = {
-    connector.broker.connection_log.info("Client disconnected from: %s", transport.getRemoteAddress)
     connector.stopped(this)
     super._stop(on_completed)
   }
 
-  protected override def on_transport_connected() = protocol_handler.on_transport_connected
+  protected override def on_transport_connected() = {
+    connector.broker.connection_log.info("connected: %s", transport.getRemoteAddress)
+    protocol_handler.on_transport_connected
+  }
 
-  protected override def on_transport_disconnected() = protocol_handler.on_transport_disconnected
+  protected override def on_transport_disconnected() = {
+    connector.broker.connection_log.info("disconnected: %s", transport.getRemoteAddress)
+    protocol_handler.on_transport_disconnected
+  }
 
   protected override def on_transport_command(command: Object) = {
     try {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1086830&r1=1086829&r2=1086830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Wed Mar 30 03:49:31 2011
@@ -151,7 +151,7 @@ class OverflowSink[T](val downstream:Sin
 class MutableSink[T] extends Sink[T] {
 
   var refiller:Runnable = NOOP
-  private var _downstream:Option[Sink[T]] =_
+  private var _downstream:Option[Sink[T]] = None
 
   def downstream_=(value: Option[Sink[T]]) {
     _downstream.foreach(d => d.refiller = NOOP )
@@ -292,7 +292,7 @@ class Session[T](val producer_queue:Disp
 
 
   override def full = {
-    assert(getCurrentQueue eq producer_queue)
+    assert(producer_queue.isExecuting)
     _full
   }
 
@@ -310,7 +310,7 @@ class Session[T](val producer_queue:Disp
   def close = {
     if( !closed ) {
       closed=true
-      assert(getCurrentQueue eq producer_queue)
+      assert(producer_queue.isExecuting)
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala?rev=1086830&r1=1086829&r2=1086830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala
Wed Mar 30 03:49:31 2011
@@ -136,10 +136,9 @@ class JettyWebServer(val broker:Broker) 
 
   var server:Server = _
 
-
   override def toString: String = "jetty webserver"
 
-  protected val dispatch_queue = createQueue()
+  val dispatch_queue = createQueue()
 
   protected def _start(on_completed: Runnable) = Broker.BLOCKABLE_THREAD_POOL {
     this.synchronized {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala?rev=1086830&r1=1086829&r2=1086830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
Wed Mar 30 03:49:31 2011
@@ -18,11 +18,11 @@ package org.apache.activemq.apollo.broke
 
 import java.io.{IOException}
 import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
-import org.apache.activemq.apollo.util.ClassFinder
 import org.apache.activemq.apollo.broker.store.MessageRecord
 import org.apache.activemq.apollo.transport._
 import org.apache.activemq.apollo.broker.{Delivery, Message, BrokerConnection}
 import org.apache.activemq.apollo.dto.ConnectionStatusDTO
+import org.apache.activemq.apollo.util.{Log, ClassFinder}
 
 /**
  * <p>
@@ -58,11 +58,13 @@ trait Protocol extends ProtocolCodecFact
 
 }
 
+object ProtocolHandler extends Log
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 trait ProtocolHandler {
+  import ProtocolHandler._
 
   def protocol:String
 
@@ -75,6 +77,7 @@ trait ProtocolHandler {
   def create_connection_status = new ConnectionStatusDTO
 
   def on_transport_failure(error:IOException) = {
+    trace(error)
     connection.stop()
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1086830&r1=1086829&r2=1086830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Wed Mar 30 03:49:31 2011
@@ -261,7 +261,7 @@ class StompProtocolHandler extends Proto
       val session = session_manager.open(producer.dispatch_queue)
 
       def close = {
-        assert(getCurrentQueue == producer.dispatch_queue)
+        assert(producer.dispatch_queue.isExecuting)
         if( !closed ) {
           closed = true
           if( browser ) {

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala?rev=1086830&r1=1086829&r2=1086830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
Wed Mar 30 03:49:31 2011
@@ -28,7 +28,7 @@ object BaseService extends Log
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait BaseService extends Service {
+trait BaseService extends Service with Dispatched {
 
   import BaseService._
 
@@ -58,8 +58,6 @@ trait BaseService extends Service {
   protected class STOPPING extends State with CallbackSupport { override def is_stopping
= true  }
   protected class STOPPED extends State { override def is_stopped = true  }
 
-  protected def dispatch_queue:DispatchQueue
-
   final def start() = start(null)
   final def stop() = stop(null)
 

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala?rev=1086830&r1=1086829&r2=1086830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
Wed Mar 30 03:49:31 2011
@@ -28,5 +28,10 @@ import org.fusesource.hawtdispatch._
 trait Dispatched {
   def dispatch_queue:DispatchQueue
 
-  protected def assert_dispatched = assert( getCurrentQueue == dispatch_queue )
+  protected def assert_executing = assert( dispatch_queue.isExecuting,
+    "Dispatch queue '%s' was not executing, (currently executing: %s)".format(
+      Option(dispatch_queue.getLabel).getOrElse(""),
+      Option(getCurrentThreadQueue).map(_.getLabel).getOrElse("None") )
+  )
+
 }
\ No newline at end of file



Mime
View raw message