activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r475289 - /incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java
Date Wed, 15 Nov 2006 16:08:47 GMT
Author: jstrachan
Date: Wed Nov 15 08:08:46 2006
New Revision: 475289

URL: http://svn.apache.org/viewvc?view=rev&rev=475289
Log:
use a temporary queue so replies can be sent to the client when they send to admin topics
like ActiveMQ.Agent

Modified:
    incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java

Modified: incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java?view=diff&rev=475289&r1=475288&r2=475289
==============================================================================
--- incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java
(original)
+++ incubator/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java
Wed Nov 15 08:08:46 2006
@@ -28,10 +28,12 @@
 import jabber.client.Message;
 import jabber.client.Presence;
 import jabber.iq.auth.Query;
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.command.*;
 import org.apache.activemq.transport.xmpp.command.Handler;
 import org.apache.activemq.transport.xmpp.command.HandlerRegistry;
 import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.IntSequenceGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -74,16 +76,19 @@
     private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
     private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
     private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
+    private final IntSequenceGenerator tempDestinationIdGenerator = new IntSequenceGenerator();
 
     private final Map<Integer, Handler<Response>> resposeHandlers = new ConcurrentHashMap<Integer,
Handler<Response>>();
     private final Map<ConsumerId, Handler<MessageDispatch>> subscriptionsByConsumerId
= new ConcurrentHashMap<ConsumerId, Handler<MessageDispatch>>();
     private final Map<String, ConsumerInfo> jidToConsumerMap = new HashMap<String,
ConsumerInfo>();
+    private final Map<String, ConsumerInfo> jidToInboxConsumerMap = new HashMap<String,
ConsumerInfo>();
 
     private final Map transactions = new ConcurrentHashMap();
 
     private final Object commnadIdMutex = new Object();
     private int lastCommandId;
     private final AtomicBoolean connected = new AtomicBoolean(false);
+    private ActiveMQTempQueue inboxDestination;
 
     public ProtocolConverter(XmppTransport transport) {
         this.transport = transport;
@@ -368,14 +373,32 @@
             log.debug("No 'to' attribute specified for presence so not creating a JMS subscription");
             return;
         }
+        subscribe(to, destination, jidToConsumerMap);
 
+        // lets subscribe to a personal inbox for replies
+
+        // Check if Destination info is of temporary type.
+        if (inboxDestination == null) {
+            inboxDestination = new ActiveMQTempQueue(connectionInfo.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
+
+            DestinationInfo info = new DestinationInfo();
+            info.setConnectionId(connectionInfo.getConnectionId());
+            info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
+            info.setDestination(inboxDestination);
+            sendToActiveMQ(info, null);
+
+            subscribe(to, inboxDestination, jidToInboxConsumerMap);
+        }
+    }
+
+    protected void subscribe(final String to, ActiveMQDestination destination, Map<String,
ConsumerInfo> consumerMap) {
         boolean createConsumer = false;
         ConsumerInfo consumerInfo = null;
-        synchronized (jidToConsumerMap) {
-            consumerInfo = jidToConsumerMap.get(to);
+        synchronized (consumerMap) {
+            consumerInfo = consumerMap.get(to);
             if (consumerInfo == null) {
                 consumerInfo = new ConsumerInfo();
-                jidToConsumerMap.put(to, consumerInfo);
+                consumerMap.put(to, consumerInfo);
 
                 ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
                 consumerInfo.setConsumerId(consumerId);
@@ -404,7 +427,7 @@
                 Message message = createXmppMessage(to, messageDispatch);
                 if (message != null) {
                     if (log.isDebugEnabled()) {
-                        log.debug("Sending message to XMPP client from: " + message.getFrom()
+ " to: " + message.getTo() +  " type: " + message.getType() + " with body: " + message.getAny());
+                        log.debug("Sending message to XMPP client from: " + message.getFrom()
+ " to: " + message.getTo() + " type: " + message.getType() + " with body: " + message.getAny());
                     }
                     transport.marshall(message);
                 }
@@ -417,7 +440,7 @@
         Message answer = new Message();
         answer.setType("groupchat");
         String from = to;
-        int idx= from.indexOf('/');
+        int idx = from.indexOf('/');
         if (idx > 0) {
             from = from.substring(0, idx) + "/broker";
         }
@@ -429,11 +452,14 @@
         if (message instanceof ActiveMQTextMessage) {
             ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage) message;
             Body body = new Body();
-            body.setValue(activeMQTextMessage.getText());
+            String text = activeMQTextMessage.getText();
+            log.info("Setting the body text to be: " + text);
+            body.setValue(text);
             answer.getAny().add(body);
         }
         else {
             // TODO support other message types
+            log.warn("Could not convert the message to a complete Jabber message: " + message);
         }
         return answer;
     }
@@ -555,6 +581,13 @@
         if (idx > 0) {
             name = name.substring(0, idx);
         }
+
+        System.out.println("#### Creating ActiveMQ destination for: " + name);
+
+        // lets support lower-case versions of the agent topic
+        if (name.equalsIgnoreCase(AdvisorySupport.AGENT_TOPIC)) {
+            name = AdvisorySupport.AGENT_TOPIC;
+        }
         return new ActiveMQTopic(name);
     }
 
@@ -579,7 +612,12 @@
         answer.setStringProperty("XMPPLang", message.getLang());
         answer.setStringProperty("XMPPTo", message.getTo());
         answer.setJMSType(message.getType());
-        answer.setJMSReplyTo(createActiveMQDestination(message.getFrom()));
+        ActiveMQDestination replyTo = createActiveMQDestination(message.getFrom());
+        if (replyTo == null) {
+            replyTo = inboxDestination;
+        }
+        System.out.println("Setting reply to destination to: " + replyTo);
+        answer.setJMSReplyTo(replyTo);
     }
 
     protected void onAuth(Auth auth) throws Exception {



Mime
View raw message