activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r753003 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ test/java/org/apache/activemq/broker/openwire/
Date Thu, 12 Mar 2009 20:32:50 GMT
Author: chirino
Date: Thu Mar 12 20:32:50 2009
New Revision: 753003

URL: http://svn.apache.org/viewvc?rev=753003&view=rev
Log:
Consumer actually gets messages now... just need to debug the flow control now.


Added:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
Removed:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/Openwire2Support.java
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java?rev=753003&r1=753002&r2=753003&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
Thu Mar 12 20:32:50 2009
@@ -21,8 +21,8 @@
         queues.get(name).addConsumer(deliveryTarget);
     }
 
-    public Collection<DeliveryTarget> route(MessageDelivery name) {
-        Queue queue = queues.get(name);
+    public Collection<DeliveryTarget> route(MessageDelivery delivery) {
+        Queue queue = queues.get(delivery.getDestination().getName());
         if( queue!=null ) {
             ArrayList<DeliveryTarget> rc = new ArrayList<DeliveryTarget>(1);
             rc.add(queue);

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java?rev=753003&r1=753002&r2=753003&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
Thu Mar 12 20:32:50 2009
@@ -25,8 +25,8 @@
         targets.add(target);
     }
 
-    public Collection<DeliveryTarget> route(MessageDelivery name) {
-        return topicsTargets.get(name);
+    public Collection<DeliveryTarget> route(MessageDelivery delivery) {
+        return topicsTargets.get(delivery.getDestination().getName());
     }
 
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java?rev=753003&r1=753002&r2=753003&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
Thu Mar 12 20:32:50 2009
@@ -60,7 +60,6 @@
 import org.apache.activemq.queue.SingleFlowRelay;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.state.CommandVisitor;
-import org.apache.activemq.transport.DispatchableTransport;
 
 public class OpenwireBrokerConnection extends BrokerConnection {
 
@@ -363,25 +362,20 @@
                 }
             };
             queue = new SingleFlowRelay<MessageDelivery>(flow, name + "-outbound",
limiter);
-            if (transport instanceof DispatchableTransport) {
-                queue.setDrain(new IFlowDrain<MessageDelivery>() {
-                    public void drain(MessageDelivery message, ISourceController<MessageDelivery>
controller) {
-                        write(message);
-                    }
-                });
-
-            } else {
-                queue.setDrain(new IFlowDrain<MessageDelivery>() {
-                    public void drain(final MessageDelivery message, ISourceController<MessageDelivery>
controller) {
-                        write(message);
-                    };
-                });
-            }
+            queue.setDrain(new IFlowDrain<MessageDelivery>() {
+                public void drain(final MessageDelivery message, ISourceController<MessageDelivery>
controller) {
+                    Message msg = message.asType(Message.class);
+                    MessageDispatch md = new MessageDispatch();
+                    md.setConsumerId(info.getConsumerId());
+                    md.setMessage(msg);
+                    md.setDestination(msg.getDestination());
+                    write(md);
+                };
+            });
         }
 
         public IFlowSink<MessageDelivery> getSink() {
-            // TODO Auto-generated method stub
-            return null;
+            return queue;
         }
 
         public boolean match(MessageDelivery message) {

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java?rev=753003&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
Thu Mar 12 20:32:50 2009
@@ -0,0 +1,82 @@
+package org.apache.activemq.broker.openwire;
+
+import javax.jms.MessageNotWriteableException;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.SessionInfo;
+
+public class OpenwireSupport {
+    
+    static private long idGenerator;
+    static private long msgIdGenerator;
+
+    public static ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination
destination) throws Exception {
+        ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
+        info.setBrowser(false);
+        info.setDestination(destination);
+        info.setPrefetchSize(1000);
+        info.setDispatchAsync(false);
+        return info;
+    }
+
+    public static RemoveInfo closeConsumerInfo(ConsumerInfo consumerInfo) {
+        return consumerInfo.createRemoveCommand();
+    }
+
+    public static ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception
{
+        ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
+        return info;
+    }
+
+    public static SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception
{
+        SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
+        return info;
+    }
+
+    public static ConnectionInfo createConnectionInfo() throws Exception {
+        ConnectionInfo info = new ConnectionInfo();
+        info.setConnectionId(new ConnectionId("connection:" + (++idGenerator)));
+        info.setClientId(info.getConnectionId().getValue());
+        return info;
+    }
+
+    public static ActiveMQTextMessage createMessage(ProducerInfo producerInfo, ActiveMQDestination
destination) {
+        return createMessage(producerInfo, destination, 4, null);
+    }
+    
+    public static ActiveMQTextMessage createMessage(ProducerInfo producerInfo, ActiveMQDestination
destination, int priority, String payload) {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setJMSPriority(priority);
+        message.setProducerId(producerInfo.getProducerId());
+        message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
+        message.setDestination(destination);
+        message.setPersistent(false);
+        if( payload!=null ) {
+            try {
+                message.setText(payload);
+            } catch (MessageNotWriteableException e) {
+            }
+        }
+        return message;
+    }
+
+    public static MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int count,
byte ackType) {
+        MessageAck ack = new MessageAck();
+        ack.setAckType(ackType);
+        ack.setConsumerId(consumerInfo.getConsumerId());
+        ack.setDestination(msg.getDestination());
+        ack.setLastMessageId(msg.getMessageId());
+        ack.setMessageCount(count);
+        return ack;
+    }
+
+}

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java?rev=753003&r1=753002&r2=753003&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
Thu Mar 12 20:32:50 2009
@@ -1,8 +1,8 @@
 package org.apache.activemq.broker.openwire;
 
-import static org.apache.activemq.broker.openwire.Openwire2Support.createConnectionInfo;
-import static org.apache.activemq.broker.openwire.Openwire2Support.createConsumerInfo;
-import static org.apache.activemq.broker.openwire.Openwire2Support.createSessionInfo;
+import static org.apache.activemq.broker.openwire.OpenwireSupport.createConnectionInfo;
+import static org.apache.activemq.broker.openwire.OpenwireSupport.createConsumerInfo;
+import static org.apache.activemq.broker.openwire.OpenwireSupport.createSessionInfo;
 
 import java.net.URI;
 import java.util.concurrent.TimeUnit;
@@ -17,6 +17,7 @@
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.flow.Flow;
@@ -108,9 +109,9 @@
             if (command.getClass() == WireFormatInfo.class) {
             } else if (command.getClass() == BrokerInfo.class) {
                 System.out.println("Consumer "+name+" connected to "+((BrokerInfo)command).getBrokerName());
-            } else if (command.getClass() == MessageDelivery.class) {
-                MessageDelivery msg = (MessageDelivery) command;
-                inboundController.add(msg, null);
+            } else if (command.getClass() == MessageDispatch.class) {
+                MessageDispatch msg = (MessageDispatch) command;
+                inboundController.add(new OpenWireMessageDelivery(msg.getMessage()), null);
             } else {
                 onException(new Exception("Unrecognized command: " + command));
             }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java?rev=753003&r1=753002&r2=753003&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
Thu Mar 12 20:32:50 2009
@@ -1,9 +1,9 @@
 package org.apache.activemq.broker.openwire;
 
-import static org.apache.activemq.broker.openwire.Openwire2Support.createConnectionInfo;
-import static org.apache.activemq.broker.openwire.Openwire2Support.createMessage;
-import static org.apache.activemq.broker.openwire.Openwire2Support.createProducerInfo;
-import static org.apache.activemq.broker.openwire.Openwire2Support.createSessionInfo;
+import static org.apache.activemq.broker.openwire.OpenwireSupport.createConnectionInfo;
+import static org.apache.activemq.broker.openwire.OpenwireSupport.createMessage;
+import static org.apache.activemq.broker.openwire.OpenwireSupport.createProducerInfo;
+import static org.apache.activemq.broker.openwire.OpenwireSupport.createSessionInfo;
 
 import java.net.URI;
 import java.util.concurrent.atomic.AtomicLong;



Mime
View raw message