camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/2] camel git commit: #CAMEL-3195 Allow camel to send custom xmpp PubSub packet to a xmpp endpoint
Date Thu, 04 Dec 2014 07:38:10 GMT
Repository: camel
Updated Branches:
  refs/heads/master f229d67fe -> bd666dbc0


#CAMEL-3195 Allow camel to send custom xmpp PubSub packet to a xmpp endpoint


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1b1ff1aa
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1b1ff1aa
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1b1ff1aa

Branch: refs/heads/master
Commit: 1b1ff1aa515a2a6b964f6e98d85cec0e3dff171c
Parents: f229d67
Author: Hugo Freire <hfreire@abajar.com>
Authored: Thu Feb 6 15:59:14 2014 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Thu Dec 4 08:29:33 2014 +0100

----------------------------------------------------------------------
 .../camel/component/xmpp/XmppBinding.java       | 62 ++++++++++++++++----
 .../camel/component/xmpp/XmppConstants.java     |  1 +
 .../camel/component/xmpp/XmppConsumer.java      | 16 +++++
 .../camel/component/xmpp/XmppEndpoint.java      | 38 ++++++++++--
 .../camel/component/xmpp/XmppMessage.java       | 38 ++++++++----
 .../component/xmpp/XmppPubSubProducer.java      | 60 +++++++++++++++++++
 .../component/xmpp/UriConfigurationTest.java    | 15 +++++
 7 files changed, 204 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppBinding.java
----------------------------------------------------------------------
diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppBinding.java
b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppBinding.java
index 58debd7..1f43527 100644
--- a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppBinding.java
+++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppBinding.java
@@ -25,6 +25,9 @@ import org.apache.camel.impl.DefaultHeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.util.ObjectHelper;
 import org.jivesoftware.smack.packet.Message;
+import org.jivesoftware.smack.packet.Packet;
+import org.jivesoftware.smack.packet.Presence;
+import org.jivesoftware.smackx.pubsub.packet.PubSub;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,31 +89,68 @@ public class XmppBinding {
             message.setProperty("exchangeId", id);
         }
     }
+    
+    /**
+     * Populates the given XMPP packet from the inbound exchange
+     */
+    public void populateXmppPacket(Packet packet, Exchange exchange) {
+        Set<Map.Entry<String, Object>> entries = exchange.getIn().getHeaders().entrySet();
+        for (Map.Entry<String, Object> entry : entries) {
+            String name = entry.getKey();
+            Object value = entry.getValue();
+            if (!headerFilterStrategy.applyFilterToCamelHeaders(name, value, exchange)) {
+                    try {
+                    	packet.setProperty(name, value);
+                        LOG.debug("Added property name: " + name + " value: " + value.toString());
+                    } catch (IllegalArgumentException iae) {
+                        LOG.debug("Not adding property " + name + " to XMPP message due to
" + iae);
+                    }
+                }
+            }        
+        String id = exchange.getExchangeId();
+        if (id != null) {
+        	packet.setProperty("exchangeId", id);
+        }
+    }
+    
 
     /**
      * Extracts the body from the XMPP message
      */
-    public Object extractBodyFromXmpp(Exchange exchange, Message message) {
-        return message.getBody();
+    public Object extractBodyFromXmpp(Exchange exchange, Packet xmppPacket) {
+        return (xmppPacket instanceof Message)? GetMessageBody((Message)xmppPacket): xmppPacket;
+    }
+    
+    private Object GetMessageBody(Message message) {
+    	String messageBody = message.getBody();
+    	if(messageBody == null) //probably a pubsub message
+    		return message;
+    	return messageBody;
     }
 
-    public Map<String, Object> extractHeadersFromXmpp(Message xmppMessage, Exchange
exchange) {
+    public Map<String, Object> extractHeadersFromXmpp(Packet xmppPacket, Exchange exchange)
{
         Map<String, Object> answer = new HashMap<String, Object>();
 
-        for (String name : xmppMessage.getPropertyNames()) {
-            Object value = xmppMessage.getProperty(name);
+        for (String name : xmppPacket.getPropertyNames()) {
+            Object value = xmppPacket.getProperty(name);
 
             if (!headerFilterStrategy.applyFilterToExternalHeaders(name, value, exchange))
{
                 answer.put(name, value);
             }
         }
 
-        answer.put(XmppConstants.MESSAGE_TYPE, xmppMessage.getType());
-        answer.put(XmppConstants.SUBJECT, xmppMessage.getSubject());
-        answer.put(XmppConstants.THREAD_ID, xmppMessage.getThread());
-        answer.put(XmppConstants.FROM, xmppMessage.getFrom());
-        answer.put(XmppConstants.PACKET_ID, xmppMessage.getPacketID());
-        answer.put(XmppConstants.TO, xmppMessage.getTo());
+        if(xmppPacket instanceof Message) {
+            Message xmppMessage = (Message)xmppPacket;
+            answer.put(XmppConstants.MESSAGE_TYPE, xmppMessage.getType());
+            answer.put(XmppConstants.SUBJECT, xmppMessage.getSubject());
+            answer.put(XmppConstants.THREAD_ID, xmppMessage.getThread());
+        } else if(xmppPacket instanceof PubSub) {
+        	PubSub pubsubPacket = (PubSub)xmppPacket;
+            answer.put(XmppConstants.MESSAGE_TYPE, pubsubPacket.getType());
+        }
+        answer.put(XmppConstants.FROM, xmppPacket.getFrom());
+        answer.put(XmppConstants.PACKET_ID, xmppPacket.getPacketID());
+        answer.put(XmppConstants.TO, xmppPacket.getTo());
                 
         return answer;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConstants.java
b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConstants.java
index 68649ea..1251e9c 100644
--- a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConstants.java
+++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConstants.java
@@ -28,4 +28,5 @@ public interface XmppConstants {
     String FROM = "CamelXmppFrom";
     String PACKET_ID = "CamelXmppPacketID";
     String TO = "CamelXmppTo";
+    String docHeader = "doc";
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java
b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java
index 657c504..ef57fda 100644
--- a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java
+++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java
@@ -31,9 +31,12 @@ import org.jivesoftware.smack.SmackConfiguration;
 import org.jivesoftware.smack.XMPPConnection;
 import org.jivesoftware.smack.XMPPException;
 import org.jivesoftware.smack.filter.AndFilter;
+import org.jivesoftware.smack.filter.MessageTypeFilter;
+import org.jivesoftware.smack.filter.OrFilter;
 import org.jivesoftware.smack.filter.PacketTypeFilter;
 import org.jivesoftware.smack.filter.ToContainsFilter;
 import org.jivesoftware.smack.packet.Message;
+import org.jivesoftware.smack.packet.Message.Type;
 import org.jivesoftware.smack.packet.Packet;
 import org.jivesoftware.smack.packet.Presence;
 import org.jivesoftware.smackx.muc.DiscussionHistory;
@@ -79,6 +82,14 @@ public class XmppConsumer extends DefaultConsumer implements PacketListener,
Mes
 
         chatManager = connection.getChatManager();
         chatManager.addChatListener(this);
+        
+        OrFilter pubsubPacketFilter = new OrFilter();
+        if(endpoint.isPubsub()){
+        	//xep-0060: pubsub#notification_type can be 'headline' or 'normal'
+        	pubsubPacketFilter.addFilter(new MessageTypeFilter(Type.headline));
+        	pubsubPacketFilter.addFilter(new MessageTypeFilter(Type.normal));
+        	connection.addPacketListener(this, pubsubPacketFilter);
+        }
 
         if (endpoint.getRoom() == null) {
             privateChat = chatManager.getThreadChat(endpoint.getChatId());
@@ -209,6 +220,10 @@ public class XmppConsumer extends DefaultConsumer implements PacketListener,
Mes
         }
 
         Exchange exchange = endpoint.createExchange(message);
+
+        if(endpoint.isDoc() == true) {
+        	exchange.getIn().setHeader(XmppConstants.docHeader, message);
+        }
         try {
             getProcessor().process(exchange);
         } catch (Exception e) {
@@ -222,4 +237,5 @@ public class XmppConsumer extends DefaultConsumer implements PacketListener,
Mes
             }
         }
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java
b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java
index 8c3f853..a20799b 100644
--- a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java
+++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java
@@ -62,6 +62,10 @@ public class XmppEndpoint extends DefaultEndpoint implements HeaderFilterStrateg
     private String nickname;
     private String serviceName;
     private XMPPConnection connection;
+    private boolean pubsub = false;
+    //Set a doc header on the IN message containing a Document form of the incoming packet;

+    //default is true if pubsub is true, otherwise false
+    private boolean doc = false;
     private boolean testConnectionOnStartup = true;
     private int connectionPollDelay = 10;
 
@@ -81,6 +85,9 @@ public class XmppEndpoint extends DefaultEndpoint implements HeaderFilterStrateg
         if (room != null) {
             return createGroupChatProducer();
         } else {
+        	if(isPubsub() == true) {
+        		return createPubSubProducer();
+        	}
             if (getParticipant() == null) {
                 throw new IllegalArgumentException("No room or participant configured on
this endpoint: " + this);
             }
@@ -95,6 +102,10 @@ public class XmppEndpoint extends DefaultEndpoint implements HeaderFilterStrateg
     public Producer createPrivateChatProducer(String participant) throws Exception {
         return new XmppPrivateChatProducer(this, participant);
     }
+    
+    public Producer createPubSubProducer() throws Exception {
+    	return new XmppPubSubProducer(this);
+    }
 
     public Consumer createConsumer(Processor processor) throws Exception {
         XmppConsumer answer = new XmppConsumer(this, processor);
@@ -107,14 +118,14 @@ public class XmppEndpoint extends DefaultEndpoint implements HeaderFilterStrateg
         return createExchange(pattern, null);
     }
 
-    public Exchange createExchange(Message message) {
-        return createExchange(getExchangePattern(), message);
+    public Exchange createExchange(Packet packet) {
+        return createExchange(getExchangePattern(), packet);
     }
 
-    private Exchange createExchange(ExchangePattern pattern, Message message) {
+    private Exchange createExchange(ExchangePattern pattern, Packet packet) {
         Exchange exchange = new DefaultExchange(this, getExchangePattern());
         exchange.setProperty(Exchange.BINDING, getBinding());
-        exchange.setIn(new XmppMessage(message));
+        exchange.setIn(new XmppMessage(packet));
         return exchange;
     }
 
@@ -369,6 +380,25 @@ public class XmppEndpoint extends DefaultEndpoint implements HeaderFilterStrateg
         this.connectionPollDelay = connectionPollDelay;
     }
 
+	public void setPubsub(boolean pubsub) {
+		this.pubsub = pubsub;
+		if(pubsub == true) {
+			setDoc(true);
+		}
+	}
+
+	public boolean isPubsub() {
+		return pubsub;
+	}
+
+	public void setDoc(boolean doc) {
+		this.doc = doc;
+	}
+
+	public boolean isDoc() {
+		return doc;
+	}
+
     // Implementation methods
     // -------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppMessage.java
----------------------------------------------------------------------
diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppMessage.java
b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppMessage.java
index 016f1fd..b75243c 100644
--- a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppMessage.java
+++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppMessage.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import org.apache.camel.impl.DefaultMessage;
 import org.apache.camel.util.ExchangeHelper;
 import org.jivesoftware.smack.packet.Message;
+import org.jivesoftware.smack.packet.Packet;
 
 /**
  * Represents a {@link org.apache.camel.Message} for working with XMPP
@@ -28,20 +29,24 @@ import org.jivesoftware.smack.packet.Message;
  * @version 
  */
 public class XmppMessage extends DefaultMessage {
-    private Message xmppMessage;
+    private Packet xmppPacket;
 
     public XmppMessage() {
         this(new Message());
     }
 
     public XmppMessage(Message jmsMessage) {
-        this.xmppMessage = jmsMessage;
+        this.xmppPacket = jmsMessage;
     }
-
+    
+    public XmppMessage(Packet jmsMessage) {
+        this.xmppPacket = jmsMessage;
+    }
+    
     @Override
     public String toString() {
-        if (xmppMessage != null) {
-            return "XmppMessage: " + xmppMessage;
+        if (xmppPacket != null) {
+            return "XmppMessage: " + xmppPacket;
         } else {
             return "XmppMessage: " + getBody();
         }
@@ -51,11 +56,22 @@ public class XmppMessage extends DefaultMessage {
      * Returns the underlying XMPP message
      */
     public Message getXmppMessage() {
-        return xmppMessage;
+        return (xmppPacket instanceof Message) ? (Message)xmppPacket : null;
     }
 
     public void setXmppMessage(Message xmppMessage) {
-        this.xmppMessage = xmppMessage;
+        this.xmppPacket = xmppMessage;
+    }
+
+    /**
+     * Returns the underlying XMPP packet
+     */
+    public Packet getXmppPacket() {
+        return xmppPacket;
+    }
+
+    public void setXmppPacket(Packet xmppPacket) {
+        this.xmppPacket = xmppPacket;
     }
     
     @Override
@@ -65,10 +81,10 @@ public class XmppMessage extends DefaultMessage {
 
     @Override
     protected Object createBody() {
-        if (xmppMessage != null) {
+        if (xmppPacket != null) {
             XmppBinding binding = ExchangeHelper.getBinding(getExchange(), XmppBinding.class);
             if (binding != null) {
-                return binding.extractBodyFromXmpp(getExchange(), xmppMessage);
+                return (getHeader(XmppConstants.docHeader) == null) ? binding.extractBodyFromXmpp(getExchange(),
xmppPacket): getHeader(XmppConstants.docHeader);
             }
         }
         return null;
@@ -76,10 +92,10 @@ public class XmppMessage extends DefaultMessage {
     
     @Override
     protected void populateInitialHeaders(Map<String, Object> map) {
-        if (xmppMessage != null) {
+        if (xmppPacket != null) {
             XmppBinding binding = ExchangeHelper.getBinding(getExchange(), XmppBinding.class);
             if (binding != null) {
-                map.putAll(binding.extractHeadersFromXmpp(xmppMessage, getExchange()));
+                map.putAll(binding.extractHeadersFromXmpp(xmppPacket, getExchange()));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPubSubProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPubSubProducer.java
b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPubSubProducer.java
new file mode 100644
index 0000000..20d46a4
--- /dev/null
+++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPubSubProducer.java
@@ -0,0 +1,60 @@
+package org.apache.camel.component.xmpp;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeExchangeException;
+import org.apache.camel.impl.DefaultProducer;
+import org.jivesoftware.smack.XMPPConnection;
+import org.jivesoftware.smack.XMPPException;
+import org.jivesoftware.smackx.pubsub.packet.PubSub;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class XmppPubSubProducer extends DefaultProducer {
+	private static final transient Logger LOG = LoggerFactory.getLogger(XmppPrivateChatProducer.class);
+    private final XmppEndpoint endpoint;
+    private XMPPConnection connection;
+
+    public XmppPubSubProducer(XmppEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+        LOG.debug("Creating XmppPresenceProducer");
+    }
+
+	public void process(Exchange exchange) throws Exception {
+        try {
+            if (connection == null) {
+                connection = endpoint.createConnection();
+            }
+
+            // make sure we are connected
+            if (!connection.isConnected()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Reconnecting to: " + XmppEndpoint.getConnectionMessage(connection));
+                }
+                connection.connect();
+            }
+        } catch (XMPPException e) {
+            throw new RuntimeExchangeException("Cannot connect to XMPP Server: "
+                    + ((connection != null) ? XmppEndpoint.getConnectionMessage(connection):
endpoint.getHost()), exchange, e);
+        }
+        
+        try {
+            Object body = exchange.getIn().getBody(Object.class);
+            if(body instanceof PubSub) {
+            	PubSub pubsubpacket = (PubSub) body;
+                endpoint.getBinding().populateXmppPacket(pubsubpacket, exchange);
+            	exchange.getIn().setHeader(XmppConstants.docHeader, pubsubpacket);
+            	connection.sendPacket(pubsubpacket);
+            } else {
+                throw new Exception("Message does not contain a pubsub packet");        

+            }        	
+        } catch (XMPPException xmppe) {
+            throw new RuntimeExchangeException("Cannot send XMPP pubsub: from " + endpoint.getUser()
+                    + " to: " + XmppEndpoint.getConnectionMessage(connection), exchange,
xmppe);
+        } catch (Exception e) {
+            throw new RuntimeExchangeException("Cannot send XMPP pubsub: from " + endpoint.getUser()
+                    + " to: " + XmppEndpoint.getConnectionMessage(connection), exchange,
e);
+        }
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1b1ff1aa/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/UriConfigurationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/UriConfigurationTest.java
b/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/UriConfigurationTest.java
index abb9699..39ef33f 100644
--- a/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/UriConfigurationTest.java
+++ b/components/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/UriConfigurationTest.java
@@ -69,4 +69,19 @@ public class UriConfigurationTest extends Assert {
 
         assertEquals("Camel", xmppEndpoint.getResource());
     }
+    
+    @Test
+    public void testPubSubConfiguration() throws Exception {
+        Endpoint endpoint = context.getEndpoint("xmpp://camel-user@localhost:123?password=secret&pubsub=true");
+        assertTrue("Endpoint not an XmppEndpoint: " + endpoint, endpoint instanceof XmppEndpoint);
+        XmppEndpoint xmppEndpoint = (XmppEndpoint) endpoint;
+
+        assertEquals("localhost", xmppEndpoint.getHost());
+        assertEquals(123, xmppEndpoint.getPort());
+        assertEquals("camel-user", xmppEndpoint.getUser());
+        assertEquals("secret", xmppEndpoint.getPassword());
+        assertEquals(true, xmppEndpoint.isPubsub());
+        assertEquals(true, xmppEndpoint.isDoc());
+    }
+    
 }


Mime
View raw message