CAMEL-10423 - Introduce XmppDirectProducer Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ca48598a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ca48598a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ca48598a Branch: refs/heads/master Commit: ca48598aeadf26eb71f49a983c0368a2dcb74521 Parents: b86cc8e Author: Allan C Authored: Wed Nov 2 10:31:21 2016 +0800 Committer: Andrea Cosentino Committed: Wed Nov 2 09:44:32 2016 +0100 ---------------------------------------------------------------------- .../component/xmpp/XmppDirectProducer.java | 84 ++++++++++++++++++++ .../camel/component/xmpp/XmppEndpoint.java | 7 ++ 2 files changed, 91 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ca48598a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppDirectProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppDirectProducer.java b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppDirectProducer.java new file mode 100644 index 0000000..bcf6312 --- /dev/null +++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppDirectProducer.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.xmpp; + +import org.apache.camel.Exchange; +import org.apache.camel.RuntimeExchangeException; +import org.apache.camel.impl.DefaultProducer; +import org.jivesoftware.smack.XMPPConnection; +import org.jivesoftware.smack.XMPPException; +import org.jivesoftware.smack.packet.Packet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class XmppDirectProducer extends DefaultProducer { + + private static final transient Logger LOG = LoggerFactory.getLogger(XmppDirectProducer.class); + + private final XmppEndpoint endpoint; + + private XMPPConnection connection; + + public XmppDirectProducer(XmppEndpoint endpoint) { + super(endpoint); + this.endpoint = endpoint; + } + + @Override + public void process(Exchange exchange) throws Exception { + try { + if (connection == null) { + connection = endpoint.createConnection(); + } + + // make sure we are connected + if (!connection.isConnected()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Reconnecting to: " + XmppEndpoint.getConnectionMessage(connection)); + } + + connection.connect(); + } + } catch (XMPPException e) { + throw new RuntimeExchangeException("Cannot connect to XMPP Server: " + + ((connection != null) ? XmppEndpoint.getConnectionMessage(connection) : endpoint.getHost()), exchange, e); + } + + try { + Object body = exchange.getIn().getBody(); + if (body instanceof Packet) { + connection.sendPacket((Packet) body); + + } else if (body instanceof Packet[]) { + final Packet[] packets = (Packet[]) body; + for (final Packet packet : packets) { + connection.sendPacket(packet); + } + + } else { + throw new Exception("Body does not contain Packet/Packet[] object(s)"); + } + } catch (XMPPException xmppe) { + throw new RuntimeExchangeException("Cannot send XMPP direct: from " + endpoint.getUser() + + " to: " + XmppEndpoint.getConnectionMessage(connection), exchange, xmppe); + + } catch (Exception e) { + throw new RuntimeExchangeException("Cannot send XMPP direct: from " + endpoint.getUser() + + " to: " + XmppEndpoint.getConnectionMessage(connection), exchange, e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ca48598a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java index 1def69d..949af61 100644 --- a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java +++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java @@ -111,6 +111,9 @@ public class XmppEndpoint extends DefaultEndpoint implements HeaderFilterStrateg if (isPubsub()) { return createPubSubProducer(); } + if (isDoc()) { + return createDirectProducer(); + } if (getParticipant() == null) { throw new IllegalArgumentException("No room or participant configured on this endpoint: " + this); } @@ -126,6 +129,10 @@ public class XmppEndpoint extends DefaultEndpoint implements HeaderFilterStrateg return new XmppPrivateChatProducer(this, participant); } + public Producer createDirectProducer() throws Exception { + return new XmppDirectProducer(this); + } + public Producer createPubSubProducer() throws Exception { return new XmppPubSubProducer(this); }