activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r753030 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/flow/ test/java/org/apache/activemq/broker/openwire/
Date Thu, 12 Mar 2009 21:50:43 GMT
Author: chirino
Date: Thu Mar 12 21:50:42 2009
New Revision: 753030

URL: http://svn.apache.org/viewvc?rev=753030&view=rev
Log:
Better flow names to make debugging easier.

Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java?rev=753030&r1=753029&r2=753030&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
Thu Mar 12 21:50:42 2009
@@ -308,18 +308,16 @@
     
     class ProducerContext {
 
-        private final ProducerInfo info;
         private IFlowController<MessageDelivery> controller;
         private String name;
 
         public ProducerContext(final ProducerInfo info) {
-            this.info = info;
             this.name = info.getProducerId().toString();
 
             // Openwire only uses credit windows at the producer level for
             // producers that request the feature.
             if (info.getWindowSize() > 0) {
-                Flow flow = new Flow(info.getProducerId().toString(), false);
+                final Flow flow = new Flow("broker-"+name+"-inbound", false);
                 WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false,
flow, info.getWindowSize(), info.getWindowSize() / 2) {
                     @Override
                     protected void sendCredit(int credit) {
@@ -334,7 +332,7 @@
                     }
 
                     public String toString() {
-                        return name;
+                        return flow.getFlowName();
                     }
                 }, flow, limiter, inboundMutex);
             } else {
@@ -357,13 +355,13 @@
             this.name = info.getConsumerId().toString();
             selector = parseSelector(info);
 
-            Flow flow = new Flow(name, false);
+            Flow flow = new Flow("broker-"+name+"-outbound", false);
             limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(),
info.getPrefetchSize() / 2) {
                 public int getElementSize(MessageDelivery m) {
                     return 1;
                 }
             };
-            queue = new SingleFlowRelay<MessageDelivery>(flow, name + "-outbound",
limiter);
+            queue = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(),
limiter);
             queue.setDrain(new IFlowDrain<MessageDelivery>() {
                 public void drain(final MessageDelivery message, ISourceController<MessageDelivery>
controller) {
                     Message msg = message.asType(Message.class);
@@ -450,7 +448,7 @@
     protected void initialize() {
 
         // Setup the inbound processing..
-        Flow flow = new Flow(name, false);
+        final Flow flow = new Flow("broker-"+name+"-inbound", false);
         SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(inputWindowSize,
inputResumeThreshold);
         inboundController = new FlowController<MessageDelivery>(new FlowControllableAdapter()
{
             public void flowElemAccepted(ISourceController<MessageDelivery> controller,
MessageDelivery elem) {
@@ -458,7 +456,7 @@
             }
 
             public String toString() {
-                return name;
+                return flow.getFlowName();
             }
         }, flow, limiter, inboundMutex);
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=753030&r1=753029&r2=753030&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
Thu Mar 12 21:50:42 2009
@@ -325,8 +325,7 @@
         setUnThrottleListener();
 
         if (!blockedSources.contains(source)) {
-            // System.out.println("BLOCKING  : SINK[" + this + "], SOURCE[" +
-            // source + "]");
+//            System.out.println("BLOCKING  : SINK[" + this + "], SOURCE[" + source + "]");
             blockedSources.add(source);
             source.onFlowBlock(this);
         }
@@ -391,9 +390,7 @@
                     String was = Thread.currentThread().getName();
                     try {
                         for (ISourceController<E> source : blockedSources) {
-                            // System.out.println("UNBLOCKING: SINK[" +
-                            // FlowController.this + "], SOURCE[" + source +
-                            // "]");
+//                            System.out.println("UNBLOCKING: SINK[" + FlowController.this
+ "], SOURCE[" + source + "]");
                             source.onFlowResume(FlowController.this);
                         }
                         for (FlowUnblockListener<E> listener : unblockListeners) {

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java?rev=753030&r1=753029&r2=753030&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
Thu Mar 12 21:50:42 2009
@@ -42,9 +42,14 @@
         return info;
     }
 
+    
     public static ConnectionInfo createConnectionInfo() throws Exception {
+        return createConnectionInfo("connection:"+ (++idGenerator));
+    }
+
+    public static ConnectionInfo createConnectionInfo(String name) throws Exception {
         ConnectionInfo info = new ConnectionInfo();
-        info.setConnectionId(new ConnectionId("connection:" + (++idGenerator)));
+        info.setConnectionId(new ConnectionId(name));
         info.setClientId(info.getConnectionId().getValue());
         return info;
     }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java?rev=753030&r1=753029&r2=753030&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
Thu Mar 12 21:50:42 2009
@@ -13,7 +13,6 @@
 import org.apache.activemq.broker.Router;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionInfo;
@@ -74,7 +73,7 @@
             activemqDestination = new ActiveMQTopic(destination.getName().toString());
         }
         
-        connectionInfo = createConnectionInfo();
+        connectionInfo = createConnectionInfo(name);
         transport.oneway(connectionInfo);
         sessionInfo = createSessionInfo(connectionInfo);
         transport.oneway(sessionInfo);
@@ -87,7 +86,7 @@
     protected void initialize() {
         
         // Setup the input processing..
-        Flow flow = new Flow(name, false);
+        final Flow flow = new Flow("client-"+name+"-inbound", false);
         WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false,
flow, inputWindowSize, inputResumeThreshold) {
             protected void sendCredit(int credit) {
                 MessageAck ack = OpenwireSupport.createAck(consumerInfo, lastMessage, credit,
MessageAck.STANDARD_ACK_TYPE);
@@ -99,7 +98,7 @@
                 messageReceived(controller, elem);
             }
             public String toString() {
-                return name;
+                return flow.getFlowName();
             }
             public IFlowSink<MessageDelivery> getFlowSink() {
                 return null;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java?rev=753030&r1=753029&r2=753030&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
Thu Mar 12 21:50:42 2009
@@ -88,7 +88,7 @@
             activemqDestination = new ActiveMQTopic(destination.getName().toString());
         }
         
-        connectionInfo = createConnectionInfo();
+        connectionInfo = createConnectionInfo(name);
         transport.oneway(connectionInfo);
         sessionInfo = createSessionInfo(connectionInfo);
         transport.oneway(sessionInfo);
@@ -101,10 +101,10 @@
     }
     
     protected void initialize() {
-        Flow flow = new Flow(name, false);
+        Flow flow = new Flow("client-"+name+"-outbound", false);
         outputResumeThreshold = outputWindowSize/2;
         outboundLimiter = new WindowLimiter<MessageDelivery>(true, flow, outputWindowSize,
outputResumeThreshold);
-        outboundQueue = new SingleFlowRelay<MessageDelivery>(flow, name + "-outbound",
outboundLimiter);
+        outboundQueue = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(),
outboundLimiter);
         
         outboundController = outboundQueue.getFlowController(flow);
         outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {



Mime
View raw message