Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 71145 invoked from network); 13 Mar 2009 16:18:33 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 13 Mar 2009 16:18:33 -0000 Received: (qmail 5016 invoked by uid 500); 13 Mar 2009 16:18:33 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 4958 invoked by uid 500); 13 Mar 2009 16:18:33 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 4949 invoked by uid 99); 13 Mar 2009 16:18:33 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Mar 2009 09:18:33 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Mar 2009 16:18:22 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 0A00423888F1; Fri, 13 Mar 2009 16:18:02 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r753311 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/wireformat/ test/java/org/apache/activemq/br... Date: Fri, 13 Mar 2009 16:18:00 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090313161802.0A00423888F1@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Fri Mar 13 16:17:58 2009 New Revision: 753311 URL: http://svn.apache.org/viewvc?rev=753311&view=rev Log: - Added the abiliity to discriminate the protocol used by a connection. - The BrokerConnection now uses a ProtocolHandler once the protocol is discriminated. Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/ConcatInputStream.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableWireFormatFactory.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/multi Removed: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java?rev=753311&r1=753310&r2=753311&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java Fri Mar 13 16:17:58 2009 @@ -22,11 +22,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.activemq.broker.MessageDelivery; import org.apache.activemq.dispatch.IDispatcher; -import org.apache.activemq.flow.Flow; -import org.apache.activemq.flow.IFlowLimiter; -import org.apache.activemq.flow.SizeLimiter; import org.apache.activemq.transport.DispatchableTransport; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportListener; @@ -77,7 +73,7 @@ protected void initialize() { } - protected final void write(final Object o) { + public final void write(final Object o) { if (blockingWriter==null) { try { transport.oneway(o); @@ -158,57 +154,8 @@ return inputResumeThreshold; } - protected interface ProtocolLimiter extends IFlowLimiter { - public void onProtocolCredit(int credit); - } - - protected class WindowLimiter extends SizeLimiter implements ProtocolLimiter { - final Flow flow; - final boolean clientMode; - private int available; - - public WindowLimiter(boolean clientMode, Flow flow, int capacity, int resumeThreshold) { - super(capacity, resumeThreshold); - this.clientMode = clientMode; - this.flow = flow; - } - - public void reserve(E elem) { - super.reserve(elem); -// if (!clientMode) { -// System.out.println(name + " Reserved " + this); -// } - } - - public void releaseReserved(E elem) { - super.reserve(elem); -// if (!clientMode) { -// System.out.println(name + " Released Reserved " + this); -// } - } - - protected void remove(int size) { - super.remove(size); - if (!clientMode) { - available += size; - if (available >= capacity - resumeThreshold) { - sendCredit(available); - available = 0; - } - } - } - - protected void sendCredit(int credit) { - throw new UnsupportedOperationException("Please override this method to provide and implemenation."); - } - - public void onProtocolCredit(int credit) { - remove(credit); - } - - public int getElementSize(MessageDelivery m) { - return m.getFlowLimiterSize(); - } + public Transport getTransport() { + return transport; } } Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java?rev=753311&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java Fri Mar 13 16:17:58 2009 @@ -0,0 +1,57 @@ +/** + * + */ +package org.apache.activemq; + +import org.apache.activemq.broker.MessageDelivery; +import org.apache.activemq.flow.Flow; +import org.apache.activemq.flow.SizeLimiter; + +public class WindowLimiter extends SizeLimiter { + final Flow flow; + final boolean clientMode; + private int available; + + public WindowLimiter(boolean clientMode, Flow flow, int capacity, int resumeThreshold) { + super(capacity, resumeThreshold); + this.clientMode = clientMode; + this.flow = flow; + } + + public void reserve(E elem) { + super.reserve(elem); +// if (!clientMode) { +// System.out.println(name + " Reserved " + this); +// } + } + + public void releaseReserved(E elem) { + super.reserve(elem); +// if (!clientMode) { +// System.out.println(name + " Released Reserved " + this); +// } + } + + protected void remove(int size) { + super.remove(size); + if (!clientMode) { + available += size; + if (available >= capacity - resumeThreshold) { + sendCredit(available); + available = 0; + } + } + } + + protected void sendCredit(int credit) { + throw new UnsupportedOperationException("Please override this method to provide and implemenation."); + } + + public void onProtocolCredit(int credit) { + remove(credit); + } + + public int getElementSize(MessageDelivery m) { + return m.getFlowLimiterSize(); + } + } \ No newline at end of file Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java?rev=753311&r1=753310&r2=753311&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java Fri Mar 13 16:17:58 2009 @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.Connection; -import org.apache.activemq.broker.openwire.OpenwireBrokerConnection; import org.apache.activemq.dispatch.IDispatcher; import org.apache.activemq.transport.DispatchableTransportServer; import org.apache.activemq.transport.Transport; @@ -41,7 +40,8 @@ final HashMap queues = new HashMap(); private TransportServer transportServer; - private String uri; + private String bindUri; + private String connectUri; private String name; private IDispatcher dispatcher; private final AtomicBoolean stopping = new AtomicBoolean(); @@ -72,7 +72,7 @@ public final void start() throws Exception { dispatcher.start(); - transportServer = TransportFactory.bind(new URI(uri)); + transportServer = TransportFactory.bind(new URI(bindUri)); transportServer.setAcceptListener(this); if (transportServer instanceof DispatchableTransportServer) { ((DispatchableTransportServer) transportServer).setDispatcher(dispatcher); @@ -85,7 +85,7 @@ } public void onAccept(final Transport transport) { - OpenwireBrokerConnection connection = new OpenwireBrokerConnection(); + BrokerConnection connection = new BrokerConnection(); connection.setBroker(this); connection.setTransport(transport); connection.setPriorityLevels(MAX_PRIORITY); @@ -115,12 +115,12 @@ this.dispatcher = dispatcher; } - public String getUri() { - return uri; + public String getBindUri() { + return bindUri; } - public void setUri(String uri) { - this.uri = uri; + public void setBindUri(String uri) { + this.bindUri = uri; } public boolean isStopping() { @@ -131,4 +131,14 @@ return router; } + + public String getConnectUri() { + return connectUri; + } + + + public void setConnectUri(String connectUri) { + this.connectUri = connectUri; + } + } \ No newline at end of file Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java?rev=753311&r1=753310&r2=753311&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java Fri Mar 13 16:17:58 2009 @@ -17,10 +17,22 @@ package org.apache.activemq.broker; import org.apache.activemq.Connection; +import org.apache.activemq.Service; +import org.apache.activemq.broker.openwire.OpenwireProtocolHandler; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.wireformat.WireFormat; -abstract public class BrokerConnection extends Connection { +public class BrokerConnection extends Connection { protected Broker broker; + private ProtocolHandler protocolHandler; + + public interface ProtocolHandler extends Service { + public void setConnection(BrokerConnection connection); + public void onCommand(Object command); + public void onException(Exception error); + public void setWireFormat(WireFormat wf); + } public Broker getBroker() { return broker; @@ -30,10 +42,52 @@ this.broker = broker; } + @Override public boolean isStopping() { return super.isStopping() || broker.isStopping(); } + public void onCommand(Object command) { + if( protocolHandler!=null ) { + protocolHandler.onCommand(command); + } else { + try { + WireFormat wf = (WireFormat) command; + if( wf.getClass() == OpenWireFormat.class ) { + protocolHandler = new OpenwireProtocolHandler(); + protocolHandler.setConnection(this); + protocolHandler.setWireFormat(wf); + protocolHandler.start(); + } + } catch (Exception e) { + onException(e); + } + } + } + + @Override + public void onException(Exception error) { + if( protocolHandler!=null ) { + protocolHandler.onException(error); + } else { + error.printStackTrace(); + try { + stop(); + } catch (Exception ignore) { + } + } + } + + @Override + public void stop() throws Exception { + super.stop(); + if( protocolHandler!=null ) { + try { + protocolHandler.stop(); + } catch (Exception ignore) { + } + } + } } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=753311&r1=753310&r2=753311&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Fri Mar 13 16:17:58 2009 @@ -34,7 +34,7 @@ public Destination getDestination() { if( destination == null ) { - destination = OpenwireBrokerConnection.convert(message.getDestination()); + destination = OpenwireProtocolHandler.convert(message.getDestination()); } return destination; } Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=753311&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Fri Mar 13 16:17:58 2009 @@ -0,0 +1,565 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.openwire; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; + +import org.apache.activemq.WindowLimiter; +import org.apache.activemq.broker.BrokerConnection; +import org.apache.activemq.broker.DeliveryTarget; +import org.apache.activemq.broker.Destination; +import org.apache.activemq.broker.MessageDelivery; +import org.apache.activemq.broker.Router; +import org.apache.activemq.broker.BrokerConnection.ProtocolHandler; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.BrokerId; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.ConnectionControl; +import org.apache.activemq.command.ConnectionError; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerControl; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.ControlCommand; +import org.apache.activemq.command.DestinationInfo; +import org.apache.activemq.command.ExceptionResponse; +import org.apache.activemq.command.FlushCommand; +import org.apache.activemq.command.KeepAliveInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageDispatchNotification; +import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.ProducerAck; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveSubscriptionInfo; +import org.apache.activemq.command.Response; +import org.apache.activemq.command.SessionId; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.command.TransactionInfo; +import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.filter.BooleanExpression; +import org.apache.activemq.filter.LogicExpression; +import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.filter.NoLocalExpression; +import org.apache.activemq.flow.Flow; +import org.apache.activemq.flow.FlowController; +import org.apache.activemq.flow.IFlowController; +import org.apache.activemq.flow.IFlowDrain; +import org.apache.activemq.flow.IFlowSink; +import org.apache.activemq.flow.IFlowSource; +import org.apache.activemq.flow.ISourceController; +import org.apache.activemq.flow.SizeLimiter; +import org.apache.activemq.flow.ISinkController.FlowControllable; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.protobuf.AsciiBuffer; +import org.apache.activemq.queue.SingleFlowRelay; +import org.apache.activemq.selector.SelectorParser; +import org.apache.activemq.state.CommandVisitor; +import org.apache.activemq.transport.WireFormatNegotiator; +import org.apache.activemq.wireformat.WireFormat; + +public class OpenwireProtocolHandler implements ProtocolHandler { + + protected final HashMap producers = new HashMap(); + protected final HashMap consumers = new HashMap(); + + protected final Object inboundMutex = new Object(); + protected IFlowController inboundController; + + protected BrokerConnection connection; + private OpenWireFormat wireFormat; + + public void start() throws Exception { + // Setup the inbound processing.. + final Flow flow = new Flow("broker-"+connection.getName()+"-inbound", false); + SizeLimiter limiter = new SizeLimiter(connection.getInputWindowSize(), connection.getInputResumeThreshold()); + inboundController = new FlowController(new FlowControllableAdapter() { + public void flowElemAccepted(ISourceController controller, MessageDelivery elem) { + route(controller, elem); + } + + public String toString() { + return flow.getFlowName(); + } + }, flow, limiter, inboundMutex); + } + + public void stop() throws Exception { + } + + public void onCommand(Object o) { + + final Command command = (Command) o; + boolean responseRequired = command.isResponseRequired(); + try { + command.visit(new CommandVisitor() { + + // ///////////////////////////////////////////////////////////////// + // Methods that keep track of the client state + // ///////////////////////////////////////////////////////////////// + public Response processAddConnection(ConnectionInfo info) throws Exception { + return ack(command); + } + + public Response processAddSession(SessionInfo info) throws Exception { + return ack(command); + } + + public Response processAddProducer(ProducerInfo info) throws Exception { + producers.put(info.getProducerId(), new ProducerContext(info)); + return ack(command); + } + + public Response processAddConsumer(ConsumerInfo info) throws Exception { + ConsumerContext ctx = new ConsumerContext(info); + consumers.put(info.getConsumerId(), ctx); + connection.getBroker().getRouter().bind(convert(info.getDestination()), ctx); + return ack(command); + } + + public Response processRemoveConnection(ConnectionId info, long arg1) throws Exception { + return ack(command); + } + + public Response processRemoveSession(SessionId info, long arg1) throws Exception { + return ack(command); + } + + public Response processRemoveProducer(ProducerId info) throws Exception { + producers.remove(info); + return ack(command); + } + + public Response processRemoveConsumer(ConsumerId info, long arg1) throws Exception { + return ack(command); + } + + // ///////////////////////////////////////////////////////////////// + // Message Processing Methods. + // ///////////////////////////////////////////////////////////////// + public Response processMessage(Message info) throws Exception { + ProducerId producerId = info.getProducerId(); + ProducerContext producerContext = producers.get(producerId); + + OpenWireMessageDelivery md = new OpenWireMessageDelivery(info); + + // Only producers that are not using a window will block, + // and if it blocks. + // yes we block the connection's read thread. yes other + // sessions will not get + // serviced while we block here. The producer is depending + // on TCP flow + // control to slow him down so we have to stop ready from + // the socket at this + // point. + while (!producerContext.controller.offer(md, null)) { + producerContext.controller.waitForFlowUnblock(); + } + return null; + } + + public Response processMessageAck(MessageAck info) throws Exception { + ConsumerContext ctx = consumers.get(info.getConsumerId()); + ctx.ack(info); + return ack(command); + } + + // Only used when client prefetch is set to zero. + public Response processMessagePull(MessagePull info) throws Exception { + return ack(command); + } + + // ///////////////////////////////////////////////////////////////// + // Control Methods + // ///////////////////////////////////////////////////////////////// + public Response processWireFormat(WireFormatInfo info) throws Exception { + + // Negotiate the openwire encoding options. + WireFormatNegotiator wfn = new WireFormatNegotiator(connection.getTransport(), wireFormat, 1); + wfn.sendWireFormat(); + wfn.negociate(info); + + // Now that the encoding is negotiated.. let the client know the details about this + // broker. + BrokerInfo brokerInfo = new BrokerInfo(); + brokerInfo.setBrokerId(new BrokerId(connection.getBroker().getName())); + brokerInfo.setBrokerName(connection.getBroker().getName()); + brokerInfo.setBrokerURL(connection.getBroker().getBindUri()); + connection.write(brokerInfo); + return ack(command); + } + + public Response processShutdown(ShutdownInfo info) throws Exception { + return ack(command); + } + + public Response processKeepAlive(KeepAliveInfo info) throws Exception { + return ack(command); + } + + public Response processFlush(FlushCommand info) throws Exception { + return ack(command); + } + + public Response processConnectionControl(ConnectionControl info) throws Exception { + return ack(command); + } + + public Response processConnectionError(ConnectionError info) throws Exception { + return ack(command); + } + + public Response processConsumerControl(ConsumerControl info) throws Exception { + return ack(command); + } + + // ///////////////////////////////////////////////////////////////// + // Methods for server management + // ///////////////////////////////////////////////////////////////// + public Response processAddDestination(DestinationInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + + public Response processRemoveDestination(DestinationInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + + public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + + public Response processControlCommand(ControlCommand info) throws Exception { + throw new UnsupportedOperationException(); + } + + // ///////////////////////////////////////////////////////////////// + // Methods for transaction management + // ///////////////////////////////////////////////////////////////// + public Response processBeginTransaction(TransactionInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + + public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + + public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + + public Response processEndTransaction(TransactionInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + + public Response processForgetTransaction(TransactionInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + + public Response processPrepareTransaction(TransactionInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + + public Response processRecoverTransactions(TransactionInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + + public Response processRollbackTransaction(TransactionInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + + // ///////////////////////////////////////////////////////////////// + // Methods for cluster operations + // These commands are sent to the broker when it's acting like a + // client to another broker. + // ///////////////////////////////////////////////////////////////// + public Response processBrokerInfo(BrokerInfo info) throws Exception { + throw new UnsupportedOperationException(); + } + + public Response processMessageDispatch(MessageDispatch info) throws Exception { + throw new UnsupportedOperationException(); + } + + public Response processMessageDispatchNotification(MessageDispatchNotification info) throws Exception { + throw new UnsupportedOperationException(); + } + + public Response processProducerAck(ProducerAck info) throws Exception { + return ack(command); + } + + }); + } catch (Exception e) { + if (responseRequired) { + ExceptionResponse response = new ExceptionResponse(e); + response.setCorrelationId(command.getCommandId()); + connection.write(response); + } else { + connection.onException(e); + } + + } + } + + public void onException(Exception error) { + if( !connection.isStopping() ) { + error.printStackTrace(); + new Thread(){ + @Override + public void run() { + try { + connection.stop(); + } catch (Exception ignore) { + } + } + }.start(); + } + } + + // ///////////////////////////////////////////////////////////////// + // Internal Support Methods + // ///////////////////////////////////////////////////////////////// + + private Response ack(Command command) { + if (command.isResponseRequired()) { + Response rc = new Response(); + rc.setCorrelationId(command.getCommandId()); + connection.write(rc); + } + return null; + } + + static class FlowControllableAdapter implements FlowControllable { + public void flowElemAccepted(ISourceController controller, MessageDelivery elem) { + } + + public IFlowSink getFlowSink() { + return null; + } + + public IFlowSource getFlowSource() { + return null; + } + } + + class ProducerContext { + + private IFlowController controller; + private String name; + + public ProducerContext(final ProducerInfo info) { + this.name = info.getProducerId().toString(); + + // Openwire only uses credit windows at the producer level for + // producers that request the feature. + if (info.getWindowSize() > 0) { + final Flow flow = new Flow("broker-"+name+"-inbound", false); + WindowLimiter limiter = new WindowLimiter(false, flow, info.getWindowSize(), info.getWindowSize() / 2) { + @Override + protected void sendCredit(int credit) { + ProducerAck ack = new ProducerAck(info.getProducerId(), credit); + connection.write(ack); + } + }; + + controller = new FlowController(new FlowControllableAdapter() { + public void flowElemAccepted(ISourceController controller, MessageDelivery elem) { + route(controller, elem); + } + + public String toString() { + return flow.getFlowName(); + } + }, flow, limiter, inboundMutex); + } else { + controller = inboundController; + } + } + } + + class ConsumerContext implements DeliveryTarget { + + private final ConsumerInfo info; + private String name; + private BooleanExpression selector; + + private SingleFlowRelay queue; + public WindowLimiter limiter; + + public ConsumerContext(final ConsumerInfo info) throws InvalidSelectorException { + this.info = info; + this.name = info.getConsumerId().toString(); + selector = parseSelector(info); + + Flow flow = new Flow("broker-"+name+"-outbound", false); + limiter = new WindowLimiter(true, flow, info.getPrefetchSize(), info.getPrefetchSize()/2) { + public int getElementSize(MessageDelivery m) { + return 1; + } + }; + queue = new SingleFlowRelay(flow, flow.getFlowName(), limiter); + queue.setDrain(new IFlowDrain() { + public void drain(final MessageDelivery message, ISourceController controller) { + Message msg = message.asType(Message.class); + MessageDispatch md = new MessageDispatch(); + md.setConsumerId(info.getConsumerId()); + md.setMessage(msg); + md.setDestination(msg.getDestination()); + connection.write(md); + }; + }); + } + + public void ack(MessageAck info) { + synchronized(queue) { + limiter.onProtocolCredit(info.getMessageCount()); + } + } + + public IFlowSink getSink() { + return queue; + } + + public boolean match(MessageDelivery message) { + Message msg = message.asType(Message.class); + if (msg == null) { + return false; + } + + MessageEvaluationContext selectorContext = new MessageEvaluationContext(); + selectorContext.setMessageReference(msg); + selectorContext.setDestination(info.getDestination()); + try { + return (selector == null || selector.matches(selectorContext)); + } catch (JMSException e) { + e.printStackTrace(); + return false; + } + } + + } + + protected void route(ISourceController controller, MessageDelivery elem) { + // TODO: + // Consider doing some caching of this target list. Most producers + // always send to + // the same destination. + Collection targets = connection.getBroker().getRouter().route(elem); + + final Message message = ((OpenWireMessageDelivery) elem).getMessage(); + if (targets != null) { + + if (message.isResponseRequired()) { + // We need to ack the message once we ensure we won't loose it. + // We know we won't loose it once it's persisted or delivered to + // a consumer + // Setup a callback to get notifed once one of those happens. + if (message.isPersistent()) { + elem.setCompletionCallback(new Runnable() { + public void run() { + ack(message); + } + }); + } else { + // Let the client know the broker got the message. + ack(message); + } + } + + // Deliver the message to all the targets.. + for (DeliveryTarget dt : targets) { + if (dt.match(elem)) { + dt.getSink().add(elem, controller); + } + } + + } else { + // Let the client know we got the message even though there + // were no valid targets to deliver the message to. + if (message.isResponseRequired()) { + ack(message); + } + } + controller.elementDispatched(elem); + } + + static public Destination convert(ActiveMQDestination dest) { + if (dest.isComposite()) { + ActiveMQDestination[] compositeDestinations = dest.getCompositeDestinations(); + ArrayList d = new ArrayList(); + for (int i = 0; i < compositeDestinations.length; i++) { + d.add(convert(compositeDestinations[i])); + } + return new Destination.MultiDestination(d); + } + AsciiBuffer domain; + if (dest.isQueue()) { + domain = Router.QUEUE_DOMAIN; + } + if (dest.isTopic()) { + domain = Router.TOPIC_DOMAIN; + } else { + throw new IllegalArgumentException("Unsupported domain type: " + dest); + } + return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName())); + } + + private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException { + BooleanExpression rc = null; + if (info.getSelector() != null) { + rc = SelectorParser.parse(info.getSelector()); + } + if (info.isNoLocal()) { + if (rc == null) { + rc = new NoLocalExpression(info.getConsumerId().getConnectionId()); + } else { + rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc); + } + } + if (info.getAdditionalPredicate() != null) { + if (rc == null) { + rc = info.getAdditionalPredicate(); + } else { + rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc); + } + } + return rc; + } + + public BrokerConnection getConnection() { + return connection; + } + + public void setConnection(BrokerConnection connection) { + this.connection = connection; + } + + public void setWireFormat(WireFormat wireFormat) { + this.wireFormat = (OpenWireFormat) wireFormat; + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/ConcatInputStream.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/ConcatInputStream.java?rev=753311&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/ConcatInputStream.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/ConcatInputStream.java Fri Mar 13 16:17:58 2009 @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.wireformat; + +import java.io.IOException; +import java.io.InputStream; + +public class ConcatInputStream extends InputStream { + + private InputStream first; + private final InputStream second; + + public ConcatInputStream(InputStream first, InputStream second) { + this.first = first; + this.second = second; + } + + @Override + public int read() throws IOException { + if( first!=null ) { + int rc = first.read(); + if( rc >= 0 ) { + return rc; + } + first = null; + } + return second.read(); + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java?rev=753311&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableOpenWireFormatFactory.java Fri Mar 13 16:17:58 2009 @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.wireformat; + +import org.apache.activemq.openwire.OpenWireFormatFactory; +import org.apache.activemq.util.ByteSequence; + +public class DiscriminatableOpenWireFormatFactory extends OpenWireFormatFactory implements DiscriminatableWireFormatFactory { + + private static final byte MAGIC[] = new byte[] {1, 'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q'}; + + public boolean matchesWireformatHeader(ByteSequence byteSequence) { + if( byteSequence.length == 4+MAGIC.length ) { + for( int i=0; i < MAGIC.length; i++ ) { + if( byteSequence.data[i+4] != MAGIC[i] ) { + return false; + } + } + return true; + } + return false; + } + + public int maxWireformatHeaderLength() { + return 4+MAGIC.length; + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java?rev=753311&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java Fri Mar 13 16:17:58 2009 @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.wireformat; + +import org.apache.activemq.transport.stomp.StompWireFormatFactory; +import org.apache.activemq.util.ByteSequence; + +public class DiscriminatableStompWireFormatFactory extends StompWireFormatFactory implements DiscriminatableWireFormatFactory { + + public boolean matchesWireformatHeader(ByteSequence byteSequence) { + return false; + } + + public int maxWireformatHeaderLength() { + return 100; + } + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableWireFormatFactory.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableWireFormatFactory.java?rev=753311&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableWireFormatFactory.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableWireFormatFactory.java Fri Mar 13 16:17:58 2009 @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.wireformat; + +import org.apache.activemq.util.ByteSequence; + +/** + * This should actually get merged into the WireFormatFactory class. But to avoid change to much in the core right, + * now it's an additional interface. + * + */ +public interface DiscriminatableWireFormatFactory extends WireFormatFactory { + + int maxWireformatHeaderLength(); + + boolean matchesWireformatHeader(ByteSequence byteSequence); + +} Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=753311&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java Fri Mar 13 16:17:58 2009 @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.wireformat; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; + +import org.apache.activemq.util.ByteArrayInputStream; +import org.apache.activemq.util.ByteArrayOutputStream; +import org.apache.activemq.util.ByteSequence; + +public class MultiWireFormatFactory implements WireFormatFactory{ + + static class MultiWireFormat implements WireFormat { + + ArrayList wireFormatFactories = new ArrayList(); + WireFormat wireFormat; + int maxHeaderLength; + + public int getVersion() { + return 0; + } + public boolean inReceive() { + return wireFormat.inReceive(); + } + public void setVersion(int version) { + wireFormat.setVersion(version); + } + + private ByteArrayOutputStream baos = new ByteArrayOutputStream(); + private ByteArrayInputStream peeked; + + public Object unmarshal(DataInput in) throws IOException { + + while( wireFormat == null ) { + + int readByte = ((InputStream)in).read(); + if( readByte < 0 ) { + throw new EOFException(); + } + baos.write(readByte); + + // Try to discriminate what we have read so far. + for (DiscriminatableWireFormatFactory wff : wireFormatFactories) { + if( wff.matchesWireformatHeader(baos.toByteSequence()) ) { + wireFormat = wff.createWireFormat(); + peeked = new ByteArrayInputStream(baos.toByteSequence()); + return wireFormat; + } + } + + if( baos.size() >= maxHeaderLength ) { + throw new IOException("Could not discriminate the protocol."); + } + } + + // If we have some peeked data we need to feed that back.. Only happens + // for the first few bytes of the protocol header. + if( peeked!=null ) { + in = new DataInputStream( new ConcatInputStream(peeked, (InputStream)in) ); + Object rc = wireFormat.unmarshal(in); + if( peeked.available() <= 0 ) { + peeked=null; + } + return rc; + } + + return wireFormat.unmarshal(in); + } + + + public void marshal(Object command, DataOutput out) throws IOException { + wireFormat.marshal(command, out); + } + + public ByteSequence marshal(Object command) throws IOException { + throw new UnsupportedOperationException(); + } + public Object unmarshal(ByteSequence packet) throws IOException { + throw new UnsupportedOperationException(); + } + public ArrayList getWireFormatFactories() { + return wireFormatFactories; + } + public void setWireFormatFactories(ArrayList wireFormatFactories) { + this.wireFormatFactories = wireFormatFactories; + maxHeaderLength=0; + for (DiscriminatableWireFormatFactory wff : wireFormatFactories) { + maxHeaderLength = Math.max( maxHeaderLength, wff.maxWireformatHeaderLength()); + } + } + } + + public WireFormat createWireFormat() { + MultiWireFormat rc = new MultiWireFormat(); + ArrayList wireFormatFactories = new ArrayList(); + wireFormatFactories.add(new DiscriminatableStompWireFormatFactory()); + wireFormatFactories.add(new DiscriminatableOpenWireFormatFactory()); + rc.setWireFormatFactories(wireFormatFactories); + return rc; + } + +} Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java?rev=753311&r1=753310&r2=753311&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java Fri Mar 13 16:17:58 2009 @@ -59,8 +59,10 @@ // set to force marshalling even in the NON tcp case. protected boolean forceMarshalling = false; - protected String sendBrokerURI; - protected String receiveBrokerURI; + protected String sendBrokerBindURI; + protected String receiveBrokerBindURI; + protected String sendBrokerConnectURI; + protected String receiveBrokerConnectURI; // Set's the number of threads to use: protected final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors(); @@ -101,16 +103,20 @@ dispatcher = createDispatcher(); dispatcher.start(); if (tcp) { - sendBrokerURI = "tcp://localhost:10000"; - receiveBrokerURI = "tcp://localhost:20000"; + sendBrokerBindURI = "tcp://localhost:10000?wireFormat=multi"; + receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=multi"; + sendBrokerConnectURI = "tcp://localhost:10000"; + receiveBrokerConnectURI = "tcp://localhost:20000"; } else { if (forceMarshalling) { - sendBrokerURI = "pipe://SendBroker"; - receiveBrokerURI = "pipe://ReceiveBroker"; + sendBrokerBindURI = "pipe://SendBroker"; + receiveBrokerBindURI = "pipe://ReceiveBroker"; } else { - sendBrokerURI = "pipe://SendBroker"; - receiveBrokerURI = "pipe://ReceiveBroker"; + sendBrokerBindURI = "pipe://SendBroker"; + receiveBrokerBindURI = "pipe://ReceiveBroker"; } + sendBrokerConnectURI = sendBrokerBindURI; + receiveBrokerConnectURI = receiveBrokerBindURI; } } @@ -370,12 +376,12 @@ private void createConnections() throws IOException, URISyntaxException { if (multibroker) { - sendBroker = createBroker("SendBroker", sendBrokerURI); - rcvBroker = createBroker("RcvBroker", receiveBrokerURI); + sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI); + rcvBroker = createBroker("RcvBroker", receiveBrokerBindURI, receiveBrokerConnectURI); brokers.add(sendBroker); brokers.add(rcvBroker); } else { - sendBroker = rcvBroker = createBroker("Broker", sendBrokerURI); + sendBroker = rcvBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI); brokers.add(sendBroker); } @@ -425,7 +431,7 @@ } } }; - consumer.setUri(new URI(rcvBroker.getUri())); + consumer.setUri(new URI(rcvBroker.getConnectUri())); consumer.setDestination(destination); consumer.setName("consumer" + (i + 1)); consumer.setTotalConsumerRate(totalConsumerRate); @@ -442,7 +448,7 @@ } } }; - producer.setUri(new URI(sendBroker.getUri())); + producer.setUri(new URI(sendBroker.getConnectUri())); producer.setProducerId(id + 1); producer.setName("producer" + (id + 1)); producer.setDestination(destination); @@ -463,10 +469,11 @@ return queue; } - private Broker createBroker(String name, String uri) { + private Broker createBroker(String name, String bindURI, String connectUri) { Broker broker = new Broker(); broker.setName(name); - broker.setUri(uri); + broker.setBindUri(bindURI); + broker.setConnectUri(connectUri); broker.setDispatcher(dispatcher); return broker; } Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java?rev=753311&r1=753310&r2=753311&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java Fri Mar 13 16:17:58 2009 @@ -8,6 +8,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.Connection; +import org.apache.activemq.WindowLimiter; import org.apache.activemq.broker.Destination; import org.apache.activemq.broker.MessageDelivery; import org.apache.activemq.broker.Router; Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java?rev=753311&r1=753310&r2=753311&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java Fri Mar 13 16:17:58 2009 @@ -11,6 +11,7 @@ import javax.jms.JMSException; import org.apache.activemq.Connection; +import org.apache.activemq.WindowLimiter; import org.apache.activemq.broker.Destination; import org.apache.activemq.broker.MessageDelivery; import org.apache.activemq.broker.Router; Added: activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/multi URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/multi?rev=753311&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/multi (added) +++ activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/multi Fri Mar 13 16:17:58 2009 @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.wireformat.MultiWireFormatFactory