activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r883892 - in /activemq/sandbox/activemq-apollo: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/test/java/org/apache/activemq/broker/ activemq-flow/src/main/java/org/apache/activemq/flow/ activemq-openwi...
Date Tue, 24 Nov 2009 21:55:24 GMT
Author: chirino
Date: Tue Nov 24 21:55:22 2009
New Revision: 883892

URL: http://svn.apache.org/viewvc?rev=883892&view=rev
Log:
converted a benchmark to junit 4, some more progress on the stomp protocol

Modified:
    activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
    activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
    activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
    activemq/sandbox/activemq-apollo/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-apollo/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java
    activemq/sandbox/activemq-apollo/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
    activemq/sandbox/activemq-apollo/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
    activemq/sandbox/activemq-apollo/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-apollo/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompMessageDelivery.java
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompProtocolHandler.java
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormat.java
    activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompBrokerTest.java
    activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java

Modified: activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java?rev=883892&r1=883891&r2=883892&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
Tue Nov 24 21:55:22 2009
@@ -266,6 +266,7 @@
 
     /**
      * @return A buffer representation of the message to be stored in the store.
+     * @throws  
      */
     protected abstract Buffer getStoreEncoded();
 

Modified: activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java?rev=883892&r1=883891&r2=883892&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
Tue Nov 24 21:55:22 2009
@@ -136,6 +136,8 @@
          */
         public boolean autoCreateDestination();
 
+        public boolean isPersistent();
+
     }
 
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java?rev=883892&r1=883891&r2=883892&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
Tue Nov 24 21:55:22 2009
@@ -27,7 +27,9 @@
 import org.apache.activemq.queue.ExclusivePersistentQueue;
 import org.apache.activemq.queue.ExclusiveQueue;
 import org.apache.activemq.queue.IFlowQueue;
+import org.apache.activemq.queue.QueueDispatchTarget;
 import org.apache.activemq.queue.Subscription;
+import org.apache.activemq.util.IntrospectionSupport;
 
 class TopicSubscription implements BrokerSubscription, DeliveryTarget {
 
@@ -47,6 +49,11 @@
         this.destination = destination;
     }
 
+    @Override
+    public String toString() {
+        return IntrospectionSupport.toString(this);
+    }
+    
     /*
      * (non-Javadoc)
      * 
@@ -71,9 +78,7 @@
 
     public synchronized void connect(final ConsumerContext subscription) throws UserAlreadyConnectedException
{
         if (this.connectedSub == null) {
-        	
-        	// Ok this is not ideal.  Perhaps not all topic subscriptions want this level of
service.
-        	if( USE_PERSISTENT_QUEUES ) {
+        	if( subscription.isPersistent() ) {
         		queue = createPersistentQueue(subscription);
         	} else {
         		queue = createNonPersistentQueue(subscription);
@@ -88,12 +93,17 @@
         }
     }
 
-    private IFlowQueue<MessageDelivery> createNonPersistentQueue(ConsumerContext subscription)
{
+    private IFlowQueue<MessageDelivery> createNonPersistentQueue(final ConsumerContext
subscription) {
 		Flow flow = new Flow(subscription.getResourceName(), false);
 		String name = subscription.getResourceName();
 		IFlowLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(100,
50);
 		ExclusiveQueue<MessageDelivery> queue = new ExclusiveQueue<MessageDelivery>(flow,
name, limiter);
 		queue.setDispatcher(host.getBroker().getDispatcher());
+		queue.setDrain( new QueueDispatchTarget<MessageDelivery>() {
+            public void drain(MessageDelivery elem, ISourceController<MessageDelivery>
controller) {
+                subscription.add(elem, controller);
+            }
+        });
 		return queue;
 	}
 

Modified: activemq/sandbox/activemq-apollo/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=883892&r1=883891&r2=883892&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
Tue Nov 24 21:55:22 2009
@@ -25,8 +25,6 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import junit.framework.TestCase;
-
 import org.apache.activemq.apollo.broker.Broker;
 import org.apache.activemq.apollo.broker.Destination;
 import org.apache.activemq.apollo.broker.Router;
@@ -38,10 +36,14 @@
 import org.apache.activemq.metric.Period;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.util.buffer.AsciiBuffer;
+import org.junit.Before;
+import org.junit.Test;
+
+import static java.lang.String.*;
 
-public abstract class BrokerTestBase extends TestCase {
+public abstract class BrokerTestBase {
 
-    protected static final int PERFORMANCE_SAMPLES = 30000;
+    protected static final int PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES",
"3"));
 
     protected static final int IO_WORK_AMOUNT = 0;
     protected static final int FANIN_COUNT = 10;
@@ -92,8 +94,8 @@
     final ArrayList<RemoteProducer> producers = new ArrayList<RemoteProducer>();
     final ArrayList<RemoteConsumer> consumers = new ArrayList<RemoteConsumer>();
 
-    @Override
-    protected void setUp() throws Exception {
+    @Before
+    public void setUp() throws Exception {
         dispatcher = createDispatcher();
         dispatcher.start();
         
@@ -115,7 +117,17 @@
             }
         }
     }
-
+    
+    String name;
+    
+    private void setName(String name) {
+        if( this.name==null ) {
+            this.name = name;
+        }
+    }
+    private String getName() {
+        return name;
+    }
     protected String getBrokerWireFormat() {
         return "multi";
     }
@@ -126,7 +138,9 @@
         return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY,
asyncThreadPoolSize);
     }
 
-    public void test_1_1_0() throws Exception {
+    @Test
+    public void benchmark_1_1_0() throws Exception {
+        setName("1 producer -> 1 destination -> 0 consumers");
         if (ptp) {
             return;
         }
@@ -144,7 +158,9 @@
         }
     }
 
-    public void test_1_1_1() throws Exception {
+    @Test
+    public void benchmark_1_1_1() throws Exception {
+        setName("1 producer -> 1 destination -> 1 consumers");
         producerCount = 1;
         destCount = 1;
         consumerCount = 1;
@@ -160,7 +176,9 @@
         }
     }
 
-    public void test_10_1_10() throws Exception {
+    @Test
+    public void benchmark_10_1_10() throws Exception {
+        setName(format("%d producers -> 1 destination -> %d consumers", FANIN_COUNT,
FANOUT_COUNT));
         producerCount = FANIN_COUNT;
         consumerCount = FANOUT_COUNT;
         destCount = 1;
@@ -176,7 +194,9 @@
         }
     }
 
-    public void test_10_1_1() throws Exception {
+    @Test
+    public void benchmark_10_1_1() throws Exception {
+        setName(format("%d producers -> 1 destination -> 1 consumer", FANIN_COUNT));
         producerCount = FANIN_COUNT;
         destCount = 1;
         consumerCount = 1;
@@ -192,7 +212,9 @@
         }
     }
 
-    public void test_1_1_10() throws Exception {
+    @Test
+    public void benchmark_1_1_10() throws Exception {
+        setName(format("1 producer -> 1 destination -> %d consumers", FANOUT_COUNT));
         producerCount = 1;
         destCount = 1;
         consumerCount = FANOUT_COUNT;
@@ -208,7 +230,9 @@
         }
     }
 
-    public void test_2_2_2() throws Exception {
+    @Test
+    public void benchmark_2_2_2() throws Exception {
+        setName(format("2 producer -> 2 destination -> 2 consumers"));
         producerCount = 2;
         destCount = 2;
         consumerCount = 2;
@@ -224,7 +248,9 @@
         }
     }
 
-    public void test_10_10_10() throws Exception {
+    @Test
+    public void benchmark_10_10_10() throws Exception {
+        setName(format("10 producers -> 10 destinations -> 10 consumers"));
         producerCount = 10;
         destCount = 10;
         consumerCount = 10;
@@ -247,7 +273,9 @@
      * 
      * @throws Exception
      */
-    public void test_2_2_2_SlowConsumer() throws Exception {
+    @Test
+    public void benchmark_2_2_2_SlowConsumer() throws Exception {
+        setName(format("2 producer -> 2 destination -> 2 slow consumers"));
         producerCount = 2;
         destCount = 2;
         consumerCount = 2;
@@ -264,7 +292,9 @@
         }
     }
 
-    public void test_2_2_2_Selector() throws Exception {
+    @Test
+    public void benchmark_2_2_2_Selector() throws Exception {
+        setName(format("2 producer -> 2 destination -> 2 selector consumers"));
         producerCount = 2;
         destCount = 2;
         consumerCount = 2;
@@ -293,8 +323,10 @@
      * 
      * @throws Exception
      */
-    public void test_2_1_1_HighPriorityProducer() throws Exception {
+    @Test
+    public void benchmark_2_1_1_HighPriorityProducer() throws Exception {
 
+        setName(format("1 high and 1 normal priority producer -> 1 destination -> 1
consumer"));
         producerCount = 2;
         destCount = 1;
         consumerCount = 1;
@@ -332,7 +364,9 @@
      * 
      * @throws Exception
      */
-    public void test_2_1_1_MixedHighPriorityProducer() throws Exception {
+    @Test
+    public void benchmark_2_1_1_MixedHighPriorityProducer() throws Exception {
+        setName(format("1 high/mixed and 1 normal priority producer -> 1 destination ->
1 consumer"));
         producerCount = 2;
         destCount = 1;
         consumerCount = 1;
@@ -526,3 +560,4 @@
     }
 
 }
+;
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java?rev=883892&r1=883891&r2=883892&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java
Tue Nov 24 21:55:22 2009
@@ -18,6 +18,8 @@
 
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.util.IntrospectionSupport;
+
 /**
  */
 final public class Flow {
@@ -85,7 +87,6 @@
     }
 
     public String toString() {
-        return getFlowName();
+        return IntrospectionSupport.toString(this);
     }
-
 }

Modified: activemq/sandbox/activemq-apollo/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=883892&r1=883891&r2=883892&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
Tue Nov 24 21:55:22 2009
@@ -23,12 +23,14 @@
 import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.activemq.flow.IFlowLimiter.UnThrottleListener;
+import org.apache.activemq.util.IntrospectionSupport;
 
 /**
  */
 public class FlowController<E> implements IFlowController<E> {
 
     private static final Executor DEFAULT_EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+    
     // Sinks that are blocking us.
     private final HashSet<ISinkController<?>> blockingSinks = new HashSet<ISinkController<?>>();
 
@@ -85,6 +87,10 @@
             public final void onUnthrottled() {
                 FlowController.this.onUnthrottled();
             }
+            @Override
+            public String toString() {
+                return "DEFAULT";
+            }
         };
     }
 
@@ -100,6 +106,10 @@
         }
     }
 
+    public String toString() {
+        return IntrospectionSupport.toString(this);
+    }
+
     public final IFlowLimiter<E> getLimiter() {
         return limiter;
     }
@@ -412,10 +422,6 @@
         }
     }
 
-    public String toString() {
-        return name;
-    }
-
     public void setExecutor(Executor executor) {
         synchronized (mutex) {
             this.executor = executor;

Modified: activemq/sandbox/activemq-apollo/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java?rev=883892&r1=883891&r2=883892&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
Tue Nov 24 21:55:22 2009
@@ -130,6 +130,6 @@
     }
 
     public String toString() {
-        return "SizeLimiter " + capacity + "/" + resumeThreshold + ", s=" + size + " res="
+ reserved + ", thr= " + throttled;
+        return "{ capacity: " + capacity + ", resumeThreshold: " + resumeThreshold + ", size:
" + size + ", reserved: " + reserved + ", throttled: " + throttled+" }";
     }
 }

Modified: activemq/sandbox/activemq-apollo/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=883892&r1=883891&r2=883892&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
Tue Nov 24 21:55:22 2009
@@ -84,7 +84,6 @@
 import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.IFlowLimiter;
 import org.apache.activemq.flow.IFlowResource;
-import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
@@ -995,6 +994,10 @@
             super.close();
             consumers.remove(info.getConsumerId());
         }
+
+        public boolean isPersistent() {
+            return true;
+        }
     }
 
     private static BooleanExpression parseSelector(ConsumerInfo info) throws FilterException
{

Modified: activemq/sandbox/activemq-apollo/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=883892&r1=883891&r2=883892&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
Tue Nov 24 21:55:22 2009
@@ -22,8 +22,6 @@
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
 import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
-import org.apache.activemq.flow.ISinkController;
-import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 
 /**

Modified: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompMessageDelivery.java?rev=883892&r1=883891&r2=883892&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompMessageDelivery.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompMessageDelivery.java
Tue Nov 24 21:55:22 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.apollo.stomp;
 
+import java.io.IOException;
+
 import org.apache.activemq.apollo.broker.BrokerMessageDelivery;
 import org.apache.activemq.apollo.broker.Destination;
 import org.apache.activemq.filter.MessageEvaluationContext;
@@ -141,8 +143,11 @@
     }
     
     public Buffer getStoreEncoded() {
-        // TODO:
-        throw new UnsupportedOperationException();
+        try {
+            return StompWireFormat.INSTANCE.marshal(frame);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }
     
 

Modified: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompProtocolHandler.java?rev=883892&r1=883891&r2=883892&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompProtocolHandler.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompProtocolHandler.java
Tue Nov 24 21:55:22 2009
@@ -51,6 +51,7 @@
 import org.apache.activemq.queue.QueueDispatchTarget;
 import org.apache.activemq.queue.SingleFlowRelay;
 import org.apache.activemq.util.FactoryFinder;
+import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 import org.apache.activemq.util.buffer.Buffer;
 import org.apache.activemq.util.buffer.ByteArrayOutputStream;
@@ -84,7 +85,12 @@
     private final FrameTranslator translator = new DefaultFrameTranslator();
     private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/broker/stomp/frametranslator/");
     private SingleFlowRelay<MessageDelivery> outboundQueue;
-
+    
+    public static final IdGenerator CLIENT_IDS = new IdGenerator();
+    
+    private final String clientId = CLIENT_IDS.generateId();
+    private long messageId;
+    
     private HashMap<AsciiBuffer, ConsumerContext> allSentMessageIds = new HashMap<AsciiBuffer,
ConsumerContext>();
     private Router router;
 
@@ -112,7 +118,9 @@
                 Destination destination = translator(frame).convert(dest);
 
                 frame.setAction(MESSAGE);
+                frame.getHeaders().put(Stomp.Headers.Message.MESSAGE_ID, ascii(clientId+":"+(messageId++)));
                 StompMessageDelivery md = new StompMessageDelivery(frame, destination);
+                
                 inboundContext.onReceive(md);
             }
         });
@@ -121,6 +129,7 @@
                 AsciiBuffer subscriptionId = frame.get(Subscribe.ID);
                 ConsumerContext ctx = new ConsumerContext(subscriptionId.toString(), frame);
                 consumers.put(ctx.stompDestination, ctx);
+                ctx.start();
                 ack(frame);
             }
         });
@@ -325,6 +334,9 @@
                 queue = outboundQueue;
             }
             
+        }
+        
+        public void start() throws Exception {
             BrokerSubscription sub = router.getVirtualHost().createSubscription(this);
             sub.connect(this);
         }
@@ -503,6 +515,10 @@
         public boolean isExclusive() {
             return false;
         }
+
+        public boolean isPersistent() {
+            return false;
+        }
     }
 
     private void sendError(String message) {

Modified: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormat.java?rev=883892&r1=883891&r2=883892&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormat.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormat.java
Tue Nov 24 21:55:22 2009
@@ -46,6 +46,8 @@
  */
 public class StompWireFormat implements WireFormat {
     
+    public static final StompWireFormat INSTANCE = new StompWireFormat();
+    
     private static final int MAX_COMMAND_LENGTH = 1024;
     private static final int MAX_HEADER_LENGTH = 1024 * 10;
     private static final int MAX_HEADERS = 1000;

Modified: activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompBrokerTest.java?rev=883892&r1=883891&r2=883892&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompBrokerTest.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompBrokerTest.java
Tue Nov 24 21:55:22 2009
@@ -3,6 +3,9 @@
 import org.apache.activemq.broker.BrokerTestBase;
 import org.apache.activemq.broker.RemoteConsumer;
 import org.apache.activemq.broker.RemoteProducer;
+import org.junit.Test;
+
+import static org.junit.Assume.*;
 
 public class StompBrokerTest extends BrokerTestBase {
 
@@ -20,4 +23,10 @@
     protected String getRemoteWireFormat() {
          return "stomp";
     }
+    
+    @Test
+    public void benchmark_1_1_0() throws Exception {
+        assumeTrue(false);
+        super.benchmark_1_1_0();
+    }
 }

Modified: activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java?rev=883892&r1=883891&r2=883892&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
(original)
+++ activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
Tue Nov 24 21:55:22 2009
@@ -27,7 +27,6 @@
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
 
 
@@ -263,43 +262,69 @@
     }
 
     public static String toString(Object target, Class<?> stopClass, Map<String,
Object> overrideFields) {
-        LinkedHashMap<String, Object> map = new LinkedHashMap<String, Object>();
-        addFields(target, target.getClass(), stopClass, map);
-        if (overrideFields != null) {
-        	for(String key : overrideFields.keySet()) {
-        	    Object value = overrideFields.get(key);
-        	    map.put(key, value);
-        	}
-
-        }
-        StringBuffer buffer = new StringBuffer(simpleName(target.getClass()));
-        buffer.append(" {");
-        Set entrySet = map.entrySet();
-        boolean first = true;
-        for (Iterator iter = entrySet.iterator(); iter.hasNext();) {
-            Map.Entry entry = (Map.Entry)iter.next();
-            if (first) {
-                first = false;
+        try {
+            LinkedHashMap<String, Object> map = new LinkedHashMap<String, Object>();
+            addFields(target, target.getClass(), stopClass, map);
+            if (overrideFields != null) {
+            	for(String key : overrideFields.keySet()) {
+            	    Object value = overrideFields.get(key);
+            	    map.put(key, value);
+            	}
+            }
+           
+            boolean useMultiLine=false;
+            LinkedHashMap<String, String> props = new LinkedHashMap<String, String>();
+            for (Entry<String, Object> entry : map.entrySet()) {
+                String key = entry.getKey();
+                String value = null;
+                if( entry.getValue() !=null ) {
+                    value = entry.getValue().toString();
+                    if( value!=null && ( value.indexOf('\n')>=0 || (key.length()+value.length())>70
) ) {
+                        useMultiLine=true;
+                    }
+                }
+                props.put(key, value);
+            }
+            
+            StringBuffer buffer = new StringBuffer();
+            if( useMultiLine) {
+                buffer.append("{\n");
+                boolean first = true;
+                for (Entry<String, String> entry : props.entrySet()) {
+                    if (first) {
+                        first = false;
+                    } else {
+                        buffer.append(",\n");
+                    }
+                    buffer.append("  ");
+                    buffer.append(entry.getKey());
+                    buffer.append(": ");
+                    buffer.append(entry.getValue());
+                }
+                buffer.append("\n}");
             } else {
-                buffer.append(", ");
+                buffer.append("{");
+                boolean first = true;
+                for (Entry<String, String> entry : props.entrySet()) {
+                    if (first) {
+                        first = false;
+                    } else {
+                        buffer.append(", ");
+                    }
+                    buffer.append(entry.getKey());
+                    buffer.append(": ");
+                    buffer.append(StringSupport.indent(entry.getValue(), 2));
+                }
+                buffer.append("}");
             }
-            buffer.append(entry.getKey());
-            buffer.append(" = ");
-            appendToString(buffer, entry.getValue());
-        }
-        buffer.append("}");
-        return buffer.toString();
-    }
-
-    protected static void appendToString(StringBuffer buffer, Object value) {
-//        if (value instanceof ActiveMQDestination) {
-//            ActiveMQDestination destination = (ActiveMQDestination)value;
-//            buffer.append(destination.getQualifiedName());
-//        } else {
-            buffer.append(value);
-//        }
+            return buffer.toString();
+        } catch (Throwable e) {
+            e.printStackTrace();
+            return "Could not toString: "+e.toString();
+        }
     }
 
+
     public static String simpleName(Class<?> clazz) {
         String name = clazz.getName();
         int p = name.lastIndexOf(".");
@@ -318,8 +343,7 @@
         Field[] fields = startClass.getDeclaredFields();
         for (int i = 0; i < fields.length; i++) {
             Field field = fields[i];
-            if (Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())
-                || Modifier.isPrivate(field.getModifiers())) {
+            if (Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())
) {
                 continue;
             }
 



Mime
View raw message