activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1075429 - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/br...
Date Mon, 28 Feb 2011 17:13:02 GMT
Author: chirino
Date: Mon Feb 28 17:13:01 2011
New Revision: 1075429

URL: http://svn.apache.org/viewvc?rev=1075429&view=rev
Log:
The Retained trait has been removed from the hawtdispatch dispatch objects.

Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java
    activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransport.java
    activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportServer.java

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala?rev=1075429&r1=1075428&r2=1075429&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
Mon Feb 28 17:13:01 2011
@@ -59,7 +59,7 @@ object HelperTrait {
   implicit def to_int(value:DatabaseEntry):Int = to_int(value.getData)
 
 
-
+  @SerialVersionUID(1)
   class LongComparator extends Comparator[Array[Byte]] with Serializable {
 
     def compare(o1: Array[Byte], o2: Array[Byte]) = {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1075429&r1=1075428&r2=1075429&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
Mon Feb 28 17:13:01 2011
@@ -98,14 +98,12 @@ class BrokerConnection(val connector: Co
   override def toString = "id: "+id.toString
 
   protected override  def _start(on_completed:Runnable) = {
-    connector.dispatch_queue.retain
     protocol_handler.set_connection(this);
     super._start(on_completed)
   }
 
   protected override def _stop(on_completed:Runnable) = {
     connector.stopped(this)
-    connector.dispatch_queue.release
     super._stop(on_completed)
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=1075429&r1=1075428&r2=1075429&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
Mon Feb 28 17:13:01 2011
@@ -99,8 +99,6 @@ class Connector(val broker:Broker, val i
 
       broker.init_dispatch_queue(connection.dispatch_queue)
 
-      // We release when it gets removed form the connections list.
-      connection.dispatch_queue.retain
       connections.put(connection.id, connection)
       info("Client connected from: %s", connection.transport.getRemoteAddress)
 
@@ -180,7 +178,6 @@ class Connector(val broker:Broker, val i
     val at_limit = at_connection_limit
     if( connections.remove(connection.id).isDefined ) {
       info("Client disconnected from: %s", connection.transport.getRemoteAddress)
-      connection.dispatch_queue.release
       if( at_limit ) {
         transportServer.resume
       }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1075429&r1=1075428&r2=1075429&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Mon Feb 28 17:13:01 2011
@@ -60,12 +60,10 @@ class Queue(val router: LocalRouter, val
   dispatch_queue {
     debug("created queue for: " + binding.label)
   }
-  setDisposer(^ {
-    ack_source.release
-    dispatch_queue.release
-    session_manager.release
-  })
 
+  override def dispose: Unit = {
+    ack_source.cancel
+  }
 
   val ack_source = createSource(new ListEventAggregator[(Subscription#AcquiredQueueEntry,
Boolean, StoreUOW)](), dispatch_queue)
   ack_source.setEventHandler(^ {drain_acks});
@@ -946,9 +944,9 @@ class QueueEntry(val queue:Queue, val se
 
     def store = {
       delivery.uow.enqueue(toQueueEntryRecord)
-      delivery.uow.on_complete(^{
+      delivery.uow.on_complete {
         queue.swap_out_completes_source.merge(this)
-      })
+      }
     }
 
     override def swap_out(asap:Boolean) = {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1075429&r1=1075428&r2=1075429&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Mon Feb 28 17:13:01 2011
@@ -71,12 +71,6 @@ abstract class DeliveryProducerRoute(val
 
   import DeliveryProducerRoute._
 
-  // Retain the queue while we are retained.
-  dispatch_queue.retain
-  setDisposer(^{
-    dispatch_queue.release
-  })
-
   var targets = List[DeliverySession]()
 
   def connected() = dispatch_queue {
@@ -180,9 +174,9 @@ abstract class DeliveryProducerRoute(val
     if (pendingAck != null) {
       if (delivery.uow != null) {
         val ack = pendingAck
-        delivery.uow.setDisposer(^ {
+        delivery.uow.on_complete {
           ack(true, null)
-        })
+        }
 
       } else {
         pendingAck(true, null)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1075429&r1=1075428&r2=1075429&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Mon Feb 28 17:13:01 2011
@@ -147,7 +147,7 @@ object MapSink {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class SinkMux[T](val downstream:Sink[T], val queue:DispatchQueue, val sizer:Sizer[T]) extends
BaseRetained {
+class SinkMux[T](val downstream:Sink[T], val queue:DispatchQueue, val sizer:Sizer[T]) {
 
   var sessions = List[Session[T]]()
   var session_max_credits = 1024*32;
@@ -160,7 +160,6 @@ class SinkMux[T](val downstream:Sink[T],
       val session = event._1
       val value = event._2
       session.credit_adder.merge(sizer.size(value));
-      session.credit_adder.release
     }
   }
   // As messages are delivered, and we credit the sessions,
@@ -168,12 +167,6 @@ class SinkMux[T](val downstream:Sink[T],
   // need to have a refiller action.
   overflow.refiller = NOOP
 
-  queue.retain
-  setDisposer(^{
-    source.release
-    queue.release
-  })
-
   // use a event aggregating source to coalesce multiple events from the same thread.
   // all the sessions send to the same source.
   val source = createSource(new ListEventAggregator[(Session[T],T)](), queue)
@@ -222,15 +215,11 @@ class Session[T](val producer_queue:Disp
   private def sizer = mux.sizer
   private def downstream = mux.source
 
-  // retain since the producer will be using this source to send messages
-  // to the consumer
-  downstream.retain
-
   // create a source to coalesce credit events back to the producer side...
   val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
-  credit_adder.setEventHandler(^{
+  credit_adder.onEvent{
     add_credits(credit_adder.getData.intValue)
-  });
+  }
   credit_adder.resume
 
   private var closed = false
@@ -266,7 +255,6 @@ class Session[T](val producer_queue:Disp
     if( _full || closed ) {
       false
     } else {
-      credit_adder.retain
       add_credits(-sizer.size(value))
       downstream.merge((this, value))
       true
@@ -277,8 +265,6 @@ class Session[T](val producer_queue:Disp
     if( !closed ) {
       closed=true
       assert(getCurrentQueue eq producer_queue)
-      credit_adder.release
-      downstream.release
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1075429&r1=1075428&r2=1075429&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
Mon Feb 28 17:13:01 2011
@@ -80,10 +80,22 @@ trait DelayingStoreSupport extends Store
     val uow_id:Int = next_batch_id.getAndIncrement
     var actions = Map[Long, MessageAction]()
 
-    var completeListeners = ListBuffer[Runnable]()
+    var completed = false
+    var completeListeners = ListBuffer[() => Unit]()
     var disableDelay = false
 
-    def on_complete(callback: Runnable) = if( callback!=null ) { this.synchronized { completeListeners
+= callback } }
+    def on_complete(callback: =>Unit) = {
+      if( this.synchronized {
+        if( completed ) {
+          true
+        } else {
+          completeListeners += ( ()=> callback  )
+          false
+        }
+      }) {
+        callback
+      }
+    }
 
     def complete_asap() = this.synchronized { disableDelay=true }
 
@@ -155,9 +167,7 @@ trait DelayingStoreSupport extends Store
 
     def onPerformed() = this.synchronized {
       commit_latency_counter += System.nanoTime-dispose_start
-      completeListeners.foreach { x=>
-        x.run
-      }
+      completeListeners.foreach(_())
       super.dispose
     }
   }
@@ -170,7 +180,7 @@ trait DelayingStoreSupport extends Store
     } else {
       // TODO: protect against this causing a 2nd flush.
       delayedUOWs.put(action.uow.uow_id, action.uow)
-      action.uow.on_complete(^{ cb })
+      action.uow.on_complete( cb )
       flush(action.uow.uow_id)
     }
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala?rev=1075429&r1=1075428&r2=1075429&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
Mon Feb 28 17:13:01 2011
@@ -69,6 +69,6 @@ trait StoreUOW extends Retained {
    * The specified callback is executed once the UOW
    * is completed.
    */
-  def on_complete(callback: Runnable)
+  def on_complete(callback: =>Unit)
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala?rev=1075429&r1=1075428&r2=1075429&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
Mon Feb 28 17:13:01 2011
@@ -139,7 +139,10 @@ abstract class StoreBenchmarkSupport ext
     }
 
     val tracker = new TaskTracker()
-    tracker.release(batch)
+    val task = tracker.task("uow complete")
+    batch.on_complete(task.run)
+    batch.release
+
     msgKeys.foreach { msg_key =>
       store.flush_message(msg_key) {}
     }
@@ -168,7 +171,8 @@ abstract class StoreBenchmarkSupport ext
       batch.enqueue(entry(queue, seq, message))
 
       val latch = new CountDownLatch(1)
-      batch.setDisposer(^{latch.countDown} )
+      batch.on_complete(latch.countDown)
+
       batch.release
       store.flush_message(message) {}
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1075429&r1=1075428&r2=1075429&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
Mon Feb 28 17:13:01 2011
@@ -123,7 +123,11 @@ abstract class StoreFunSuiteSupport exte
     }
 
     val tracker = new TaskTracker()
-    tracker.release(batch)
+
+    val task = tracker.task("uow complete")
+    batch.on_complete(task.run)
+    batch.release
+
     msg_keys.foreach { msgKey =>
       store.flush_message(msgKey) {}
     }
@@ -180,7 +184,10 @@ abstract class StoreFunSuiteSupport exte
     batch.enqueue(entry(A, 1, m1))
 
     val tracker = new TaskTracker()
-    tracker.release(batch)
+    val task = tracker.task("uow complete")
+    batch.on_complete(task.run)
+    batch.release
+
     expect(false) {
       tracker.await(3, TimeUnit.SECONDS)
     }
@@ -197,7 +204,9 @@ abstract class StoreFunSuiteSupport exte
     batch.enqueue(entry(A, 1, m1))
 
     val tracker = new TaskTracker()
-    tracker.release(batch)
+    val task = tracker.task("uow complete")
+    batch.on_complete(task.run)
+    batch.release
 
     store.flush_message(m1) {}
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1075429&r1=1075428&r2=1075429&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Mon Feb 28 17:13:01 2011
@@ -1014,9 +1014,9 @@ class StompProtocolHandler extends Proto
 
       queue.foreach{ _(uow) }
       if( uow!=null ) {
-        uow.on_complete(^{
+        uow.on_complete {
           on_complete
-        })
+        }
         uow.release
       } else {
         on_complete

Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java?rev=1075429&r1=1075428&r2=1075429&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
Mon Feb 28 17:13:01 2011
@@ -352,13 +352,7 @@ public class TcpTransport extends JavaBa
     }
 
     public void setDispatchQueue(DispatchQueue queue) {
-        if (dispatchQueue != null) {
-            dispatchQueue.release();
-        }
         this.dispatchQueue = queue;
-        if (dispatchQueue != null) {
-            dispatchQueue.retain();
-        }
     }
 
     public void _start(Runnable onCompleted) {
@@ -376,7 +370,7 @@ public class TcpTransport extends JavaBa
                             trace("connected.");
                             channel.finishConnect();
                             readSource.setCancelHandler(null);
-                            readSource.release();
+                            readSource.cancel();
                             readSource=null;
                             socketState = new CONNECTED();
                             onConnected();
@@ -465,18 +459,15 @@ public class TcpTransport extends JavaBa
     }
 
     private void dispose() {
-
         if( readSource!=null ) {
-            readSource.release();
+            readSource.cancel();
             readSource=null;
         }
 
         if( writeSource!=null ) {
-            writeSource.release();
+            writeSource.cancel();
             writeSource=null;
         }
-        
-        dispatchQueue.release();
         this.codec = null;
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java?rev=1075429&r1=1075428&r2=1075429&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java
Mon Feb 28 17:13:01 2011
@@ -166,9 +166,21 @@ public class TcpTransportServer implemen
     public void stop() throws Exception {
         stop(null);
     }
-    public void stop(Runnable onCompleted) throws Exception {
-        acceptSource.setDisposer(onCompleted);
-        acceptSource.release();
+    public void stop(final Runnable onCompleted) throws Exception {
+        if( acceptSource.isCanceled() ) {
+            onCompleted.run();
+        } else {
+            acceptSource.setCancelHandler(new Runnable() {
+                public void run() {
+                    try {
+                        channel.close();
+                    } catch (IOException e) {
+                    }
+                    onCompleted.run();
+                }
+            });
+            acceptSource.cancel();
+        }
     }
 
     public URI getBindURI() {

Modified: activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransport.java?rev=1075429&r1=1075428&r2=1075429&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransport.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransport.java
Mon Feb 28 17:13:01 2011
@@ -59,13 +59,7 @@ public class PipeTransport implements Tr
         return dispatchQueue;
     }
     public void setDispatchQueue(DispatchQueue queue) {
-        if( dispatchQueue!=null ) {
-            dispatchQueue.release();
-        }
         this.dispatchQueue = queue;
-        if( dispatchQueue!=null ) {
-            dispatchQueue.retain();
-        }
     }
 
     public void start() throws Exception {
@@ -135,9 +129,8 @@ public class PipeTransport implements Tr
             peer.dispatchSource.merge(EOF_TOKEN);
         }
         if( dispatchSource!=null ) {
-            dispatchSource.setDisposer(onCompleted);
-            dispatchSource.release();
-            dispatchSource = null;
+            dispatchSource.setCancelHandler(onCompleted);
+            dispatchSource.cancel();
         }
         setDispatchQueue(null);
     }

Modified: activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportServer.java?rev=1075429&r1=1075428&r2=1075429&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportServer.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransportServer.java
Mon Feb 28 17:13:01 2011
@@ -97,8 +97,8 @@ public class PipeTransportServer impleme
     }
     public void stop(Runnable onCompleted) throws Exception {
         PipeTransportFactory.unbind(this);
-        acceptSource.setDisposer(onCompleted);
-        acceptSource.release();
+        acceptSource.setCancelHandler(onCompleted);
+        acceptSource.cancel();
     }
 
     public void setConnectURI(URI connectURI) {



Mime
View raw message