activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmacn...@apache.org
Subject svn commit: r785887 - in /activemq/sandbox/activemq-flow: activemq-broker/src/test/java/org/apache/activemq/broker/ activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/ activemq-openwire/src/main/java/org/apache/activemq/broker/openwire...
Date Thu, 18 Jun 2009 03:36:42 GMT
Author: cmacnaug
Date: Thu Jun 18 03:36:41 2009
New Revision: 785887

URL: http://svn.apache.org/viewvc?rev=785887&view=rev
Log:
Updating OpenWireProtocolHandler to use count based window limiter rather than size based.

This allows the the protocol handler to work with the activemq-client. 

Modified:
    activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=785887&r1=785886&r2=785887&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
Thu Jun 18 03:36:41 2009
@@ -478,7 +478,6 @@
 
     private Broker createBroker(String name, String bindURI, String connectUri) throws Exception
{
         Broker broker = new Broker();
-        broker.setDefaultVirtualHost(new VirtualHost(name));
         broker.addTransportServer(TransportFactory.bind(new URI(bindURI)));
         broker.addConnectUri(connectUri);
         broker.setDispatcher(dispatcher);

Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=785887&r1=785886&r2=785887&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
Thu Jun 18 03:36:41 2009
@@ -193,7 +193,7 @@
                     }
                 }
             }
-            
+
             if (deleteAllMessages) {
                 getJournal().start();
                 journal.delete();
@@ -247,7 +247,6 @@
         try {
             open();
 
-            
             store(new Trace.TraceBean().setMessage(new AsciiBuffer("LOADED " + new Date())),
null);
         } finally {
             indexLock.writeLock().unlock();
@@ -441,7 +440,7 @@
         try {
             indexLock.writeLock().lock();
             long start = System.currentTimeMillis();
-            
+
             try {
                 if (!opened.get()) {
                     return;
@@ -718,11 +717,13 @@
 
         public final void rollback() {
             try {
-                if (updateCount > 1) {
-                    journal.write(CANCEL_UNIT_OF_WORK_DATA, false);
-                }
                 if (tx != null) {
+                    if (updateCount > 1) {
+                        journal.write(CANCEL_UNIT_OF_WORK_DATA, false);
+                    }
                     tx.rollback();
+                } else {
+                    throw new IllegalStateException("Not in Transaction");
                 }
             } catch (IOException e) {
                 throw new FatalStoreException(e);

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=785887&r1=785886&r2=785887&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
Thu Jun 18 03:36:41 2009
@@ -103,6 +103,7 @@
 
     public OpenwireProtocolHandler() {
         setStoreWireFormat(new OpenWireFormat());
+        
         visitor = new CommandVisitor() {
 
             // /////////////////////////////////////////////////////////////////
@@ -333,10 +334,7 @@
         Command command = (Command) o;
         boolean responseRequired = command.isResponseRequired();
         try {
-            
             command.visit(visitor);
-            
-            
         } catch (Exception e) {
             if (responseRequired) {
                 ExceptionResponse response = new ExceptionResponse(e);
@@ -449,7 +447,7 @@
             limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(),
info.getPrefetchSize() / 2) {
                 @Override
                 public int getElementSize(MessageDelivery m) {
-                    return m.getFlowLimiterSize();
+                    return 1;
                 }
             };
 

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java?rev=785887&r1=785886&r2=785887&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java
Thu Jun 18 03:36:41 2009
@@ -39,40 +39,48 @@
     private ConsumerInfo consumerInfo;
 
     private Message lastMessage;
-    
+
     protected void initialize() {
+        inputWindowSize = 1000;
+        inputResumeThreshold = 500;
         // Setup the input processing..
-        final Flow flow = new Flow("client-"+name+"-inbound", false);
-        inputResumeThreshold = inputWindowSize/2;
+        final Flow flow = new Flow("client-" + name + "-inbound", false);
+        inputResumeThreshold = inputWindowSize / 2;
         WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false,
flow, inputWindowSize, inputResumeThreshold) {
             @Override
             protected void sendCredit(int credit) {
                 MessageAck ack = OpenwireSupport.createAck(consumerInfo, lastMessage, credit,
MessageAck.STANDARD_ACK_TYPE);
                 write(ack);
             }
+
+            public int getElementSize(MessageDelivery md) {
+                return 1;
+            }
         };
         inboundController = new FlowController<MessageDelivery>(new FlowControllable<MessageDelivery>()
{
             public void flowElemAccepted(ISourceController<MessageDelivery> controller,
MessageDelivery elem) {
                 messageReceived(controller, elem);
             }
+
             public String toString() {
                 return flow.getFlowName();
             }
+
             public IFlowResource getFlowResource() {
                 return null;
             }
         }, flow, limiter, inboundMutex);
         inboundController.setExecutor(getDispatcher().createPriorityExecutor(getDispatcher().getDispatchPriorities()
- 1));
-        
+
     }
-    
+
     protected void setupSubscription() throws Exception, IOException {
-        if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
+        if (destination.getDomain().equals(Router.QUEUE_DOMAIN)) {
             activemqDestination = new ActiveMQQueue(destination.getName().toString());
         } else {
             activemqDestination = new ActiveMQTopic(destination.getName().toString());
         }
-        
+
         connectionInfo = createConnectionInfo(name);
         transport.oneway(connectionInfo);
         sessionInfo = createSessionInfo(connectionInfo);
@@ -81,7 +89,7 @@
         consumerInfo.setPrefetchSize(inputWindowSize);
         transport.oneway(consumerInfo);
     }
-    
+
     public void onCommand(Object command) {
         try {
             if (command.getClass() == WireFormatInfo.class) {



Mime
View raw message