Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 50917 invoked from network); 18 Jun 2009 03:36:57 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 18 Jun 2009 03:36:57 -0000 Received: (qmail 12360 invoked by uid 500); 18 Jun 2009 03:37:08 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 12313 invoked by uid 500); 18 Jun 2009 03:37:08 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 12304 invoked by uid 99); 18 Jun 2009 03:37:08 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Jun 2009 03:37:08 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Jun 2009 03:37:04 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 00F98238888F; Thu, 18 Jun 2009 03:36:42 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r785887 - in /activemq/sandbox/activemq-flow: activemq-broker/src/test/java/org/apache/activemq/broker/ activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/ activemq-openwire/src/main/java/org/apache/activemq/broker/openwire... Date: Thu, 18 Jun 2009 03:36:42 -0000 To: commits@activemq.apache.org From: cmacnaug@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090618033643.00F98238888F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cmacnaug Date: Thu Jun 18 03:36:41 2009 New Revision: 785887 URL: http://svn.apache.org/viewvc?rev=785887&view=rev Log: Updating OpenWireProtocolHandler to use count based window limiter rather than size based. This allows the the protocol handler to work with the activemq-client. Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=785887&r1=785886&r2=785887&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original) +++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Thu Jun 18 03:36:41 2009 @@ -478,7 +478,6 @@ private Broker createBroker(String name, String bindURI, String connectUri) throws Exception { Broker broker = new Broker(); - broker.setDefaultVirtualHost(new VirtualHost(name)); broker.addTransportServer(TransportFactory.bind(new URI(bindURI))); broker.addConnectUri(connectUri); broker.setDispatcher(dispatcher); Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=785887&r1=785886&r2=785887&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original) +++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Thu Jun 18 03:36:41 2009 @@ -193,7 +193,7 @@ } } } - + if (deleteAllMessages) { getJournal().start(); journal.delete(); @@ -247,7 +247,6 @@ try { open(); - store(new Trace.TraceBean().setMessage(new AsciiBuffer("LOADED " + new Date())), null); } finally { indexLock.writeLock().unlock(); @@ -441,7 +440,7 @@ try { indexLock.writeLock().lock(); long start = System.currentTimeMillis(); - + try { if (!opened.get()) { return; @@ -718,11 +717,13 @@ public final void rollback() { try { - if (updateCount > 1) { - journal.write(CANCEL_UNIT_OF_WORK_DATA, false); - } if (tx != null) { + if (updateCount > 1) { + journal.write(CANCEL_UNIT_OF_WORK_DATA, false); + } tx.rollback(); + } else { + throw new IllegalStateException("Not in Transaction"); } } catch (IOException e) { throw new FatalStoreException(e); Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=785887&r1=785886&r2=785887&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original) +++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Thu Jun 18 03:36:41 2009 @@ -103,6 +103,7 @@ public OpenwireProtocolHandler() { setStoreWireFormat(new OpenWireFormat()); + visitor = new CommandVisitor() { // ///////////////////////////////////////////////////////////////// @@ -333,10 +334,7 @@ Command command = (Command) o; boolean responseRequired = command.isResponseRequired(); try { - command.visit(visitor); - - } catch (Exception e) { if (responseRequired) { ExceptionResponse response = new ExceptionResponse(e); @@ -449,7 +447,7 @@ limiter = new WindowLimiter(true, flow, info.getPrefetchSize(), info.getPrefetchSize() / 2) { @Override public int getElementSize(MessageDelivery m) { - return m.getFlowLimiterSize(); + return 1; } }; Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java?rev=785887&r1=785886&r2=785887&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java (original) +++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java Thu Jun 18 03:36:41 2009 @@ -39,40 +39,48 @@ private ConsumerInfo consumerInfo; private Message lastMessage; - + protected void initialize() { + inputWindowSize = 1000; + inputResumeThreshold = 500; // Setup the input processing.. - final Flow flow = new Flow("client-"+name+"-inbound", false); - inputResumeThreshold = inputWindowSize/2; + final Flow flow = new Flow("client-" + name + "-inbound", false); + inputResumeThreshold = inputWindowSize / 2; WindowLimiter limiter = new WindowLimiter(false, flow, inputWindowSize, inputResumeThreshold) { @Override protected void sendCredit(int credit) { MessageAck ack = OpenwireSupport.createAck(consumerInfo, lastMessage, credit, MessageAck.STANDARD_ACK_TYPE); write(ack); } + + public int getElementSize(MessageDelivery md) { + return 1; + } }; inboundController = new FlowController(new FlowControllable() { public void flowElemAccepted(ISourceController controller, MessageDelivery elem) { messageReceived(controller, elem); } + public String toString() { return flow.getFlowName(); } + public IFlowResource getFlowResource() { return null; } }, flow, limiter, inboundMutex); inboundController.setExecutor(getDispatcher().createPriorityExecutor(getDispatcher().getDispatchPriorities() - 1)); - + } - + protected void setupSubscription() throws Exception, IOException { - if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) { + if (destination.getDomain().equals(Router.QUEUE_DOMAIN)) { activemqDestination = new ActiveMQQueue(destination.getName().toString()); } else { activemqDestination = new ActiveMQTopic(destination.getName().toString()); } - + connectionInfo = createConnectionInfo(name); transport.oneway(connectionInfo); sessionInfo = createSessionInfo(connectionInfo); @@ -81,7 +89,7 @@ consumerInfo.setPrefetchSize(inputWindowSize); transport.oneway(consumerInfo); } - + public void onCommand(Object command) { try { if (command.getClass() == WireFormatInfo.class) {