Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 10931 invoked from network); 12 Mar 2009 20:33:12 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 12 Mar 2009 20:33:12 -0000 Received: (qmail 72522 invoked by uid 500); 12 Mar 2009 20:33:12 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 72496 invoked by uid 500); 12 Mar 2009 20:33:12 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 72487 invoked by uid 99); 12 Mar 2009 20:33:12 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Mar 2009 13:33:12 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Mar 2009 20:33:11 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 3960023889B7; Thu, 12 Mar 2009 20:32:51 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r753003 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ test/java/org/apache/activemq/broker/openwire/ Date: Thu, 12 Mar 2009 20:32:50 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090312203251.3960023889B7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Thu Mar 12 20:32:50 2009 New Revision: 753003 URL: http://svn.apache.org/viewvc?rev=753003&view=rev Log: Consumer actually gets messages now... just need to debug the flow control now. Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java Removed: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/Openwire2Support.java Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java?rev=753003&r1=753002&r2=753003&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java Thu Mar 12 20:32:50 2009 @@ -21,8 +21,8 @@ queues.get(name).addConsumer(deliveryTarget); } - public Collection route(MessageDelivery name) { - Queue queue = queues.get(name); + public Collection route(MessageDelivery delivery) { + Queue queue = queues.get(delivery.getDestination().getName()); if( queue!=null ) { ArrayList rc = new ArrayList(1); rc.add(queue); Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java?rev=753003&r1=753002&r2=753003&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java Thu Mar 12 20:32:50 2009 @@ -25,8 +25,8 @@ targets.add(target); } - public Collection route(MessageDelivery name) { - return topicsTargets.get(name); + public Collection route(MessageDelivery delivery) { + return topicsTargets.get(delivery.getDestination().getName()); } } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java?rev=753003&r1=753002&r2=753003&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java Thu Mar 12 20:32:50 2009 @@ -60,7 +60,6 @@ import org.apache.activemq.queue.SingleFlowRelay; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.state.CommandVisitor; -import org.apache.activemq.transport.DispatchableTransport; public class OpenwireBrokerConnection extends BrokerConnection { @@ -363,25 +362,20 @@ } }; queue = new SingleFlowRelay(flow, name + "-outbound", limiter); - if (transport instanceof DispatchableTransport) { - queue.setDrain(new IFlowDrain() { - public void drain(MessageDelivery message, ISourceController controller) { - write(message); - } - }); - - } else { - queue.setDrain(new IFlowDrain() { - public void drain(final MessageDelivery message, ISourceController controller) { - write(message); - }; - }); - } + queue.setDrain(new IFlowDrain() { + public void drain(final MessageDelivery message, ISourceController controller) { + Message msg = message.asType(Message.class); + MessageDispatch md = new MessageDispatch(); + md.setConsumerId(info.getConsumerId()); + md.setMessage(msg); + md.setDestination(msg.getDestination()); + write(md); + }; + }); } public IFlowSink getSink() { - // TODO Auto-generated method stub - return null; + return queue; } public boolean match(MessageDelivery message) { Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java?rev=753003&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java (added) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java Thu Mar 12 20:32:50 2009 @@ -0,0 +1,82 @@ +package org.apache.activemq.broker.openwire; + +import javax.jms.MessageNotWriteableException; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveInfo; +import org.apache.activemq.command.SessionInfo; + +public class OpenwireSupport { + + static private long idGenerator; + static private long msgIdGenerator; + + public static ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception { + ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator); + info.setBrowser(false); + info.setDestination(destination); + info.setPrefetchSize(1000); + info.setDispatchAsync(false); + return info; + } + + public static RemoveInfo closeConsumerInfo(ConsumerInfo consumerInfo) { + return consumerInfo.createRemoveCommand(); + } + + public static ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception { + ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator); + return info; + } + + public static SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception { + SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator); + return info; + } + + public static ConnectionInfo createConnectionInfo() throws Exception { + ConnectionInfo info = new ConnectionInfo(); + info.setConnectionId(new ConnectionId("connection:" + (++idGenerator))); + info.setClientId(info.getConnectionId().getValue()); + return info; + } + + public static ActiveMQTextMessage createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) { + return createMessage(producerInfo, destination, 4, null); + } + + public static ActiveMQTextMessage createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int priority, String payload) { + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setJMSPriority(priority); + message.setProducerId(producerInfo.getProducerId()); + message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator)); + message.setDestination(destination); + message.setPersistent(false); + if( payload!=null ) { + try { + message.setText(payload); + } catch (MessageNotWriteableException e) { + } + } + return message; + } + + public static MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int count, byte ackType) { + MessageAck ack = new MessageAck(); + ack.setAckType(ackType); + ack.setConsumerId(consumerInfo.getConsumerId()); + ack.setDestination(msg.getDestination()); + ack.setLastMessageId(msg.getMessageId()); + ack.setMessageCount(count); + return ack; + } + +} Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java?rev=753003&r1=753002&r2=753003&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java Thu Mar 12 20:32:50 2009 @@ -1,8 +1,8 @@ package org.apache.activemq.broker.openwire; -import static org.apache.activemq.broker.openwire.Openwire2Support.createConnectionInfo; -import static org.apache.activemq.broker.openwire.Openwire2Support.createConsumerInfo; -import static org.apache.activemq.broker.openwire.Openwire2Support.createSessionInfo; +import static org.apache.activemq.broker.openwire.OpenwireSupport.createConnectionInfo; +import static org.apache.activemq.broker.openwire.OpenwireSupport.createConsumerInfo; +import static org.apache.activemq.broker.openwire.OpenwireSupport.createSessionInfo; import java.net.URI; import java.util.concurrent.TimeUnit; @@ -17,6 +17,7 @@ import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.flow.Flow; @@ -108,9 +109,9 @@ if (command.getClass() == WireFormatInfo.class) { } else if (command.getClass() == BrokerInfo.class) { System.out.println("Consumer "+name+" connected to "+((BrokerInfo)command).getBrokerName()); - } else if (command.getClass() == MessageDelivery.class) { - MessageDelivery msg = (MessageDelivery) command; - inboundController.add(msg, null); + } else if (command.getClass() == MessageDispatch.class) { + MessageDispatch msg = (MessageDispatch) command; + inboundController.add(new OpenWireMessageDelivery(msg.getMessage()), null); } else { onException(new Exception("Unrecognized command: " + command)); } Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java?rev=753003&r1=753002&r2=753003&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java Thu Mar 12 20:32:50 2009 @@ -1,9 +1,9 @@ package org.apache.activemq.broker.openwire; -import static org.apache.activemq.broker.openwire.Openwire2Support.createConnectionInfo; -import static org.apache.activemq.broker.openwire.Openwire2Support.createMessage; -import static org.apache.activemq.broker.openwire.Openwire2Support.createProducerInfo; -import static org.apache.activemq.broker.openwire.Openwire2Support.createSessionInfo; +import static org.apache.activemq.broker.openwire.OpenwireSupport.createConnectionInfo; +import static org.apache.activemq.broker.openwire.OpenwireSupport.createMessage; +import static org.apache.activemq.broker.openwire.OpenwireSupport.createProducerInfo; +import static org.apache.activemq.broker.openwire.OpenwireSupport.createSessionInfo; import java.net.URI; import java.util.concurrent.atomic.AtomicLong;