Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 081D118A15 for ; Sat, 17 Oct 2015 08:38:55 +0000 (UTC) Received: (qmail 34164 invoked by uid 500); 17 Oct 2015 08:38:55 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 34018 invoked by uid 500); 17 Oct 2015 08:38:54 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 34000 invoked by uid 99); 17 Oct 2015 08:38:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 17 Oct 2015 08:38:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BD8DAE0A34; Sat, 17 Oct 2015 08:38:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Sat, 17 Oct 2015 08:38:56 -0000 Message-Id: <1056bb7c9b264e7dbfd2c2a3965b088a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/6] camel git commit: CAMEL-9232: camel-paho - Create exchange correct. Fixes #635. CAMEL-9232: camel-paho - Create exchange correct. Fixes #635. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d2429e7a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d2429e7a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d2429e7a Branch: refs/heads/master Commit: d2429e7a475a6fd41bc39ee3dcaabd8af2d19362 Parents: 9e5a51c Author: Claus Ibsen Authored: Sat Oct 17 10:36:15 2015 +0200 Committer: Claus Ibsen Committed: Sat Oct 17 10:41:19 2015 +0200 ---------------------------------------------------------------------- .../camel/component/paho/MqttProperties.java | 71 -------------------- .../camel/component/paho/PahoComponent.java | 14 ---- .../camel/component/paho/PahoConstants.java | 3 +- .../camel/component/paho/PahoConsumer.java | 24 +------ .../camel/component/paho/PahoEndpoint.java | 33 +++++---- .../camel/component/paho/PahoMessage.java | 47 +++++++++++++ .../camel/component/paho/PahoComponentTest.java | 27 +++----- 7 files changed, 78 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java ---------------------------------------------------------------------- diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java deleted file mode 100644 index 782654a..0000000 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.paho; - -/** - * MQTT message properties. - */ -public class MqttProperties { - - private String topic; - - private int qos; - - private boolean retain; - - private boolean duplicate; - - public MqttProperties() {} - - public String getTopic() { - return topic; - } - - public void setTopic(String topic) { - this.topic = topic; - } - - public int getQos() { - return qos; - } - - public void setQos(int qos) { - this.qos = qos; - } - - public boolean isRetain() { - return retain; - } - - public void setRetain(boolean retain) { - this.retain = retain; - } - - public boolean isDuplicate() { - return duplicate; - } - - public void setDuplicate(boolean duplicate) { - this.duplicate = duplicate; - } - - @Override - public String toString() { - return "PahoMqttProperties [topic=" + topic + ", qos=" + qos + "]"; - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java index ea0b627..c9cbc41 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java @@ -27,7 +27,6 @@ public class PahoComponent extends UriEndpointComponent { private String brokerUrl; private String clientId; private MqttConnectOptions connectOptions; - private String headerType; public PahoComponent() { super(PahoEndpoint.class); @@ -46,9 +45,6 @@ public class PahoComponent extends UriEndpointComponent { if (connectOptions != null) { answer.setConnectOptions(connectOptions); } - if (headerType != null) { - answer.setHeaderType(headerType); - } setProperties(answer, parameters); return answer; @@ -87,14 +83,4 @@ public class PahoComponent extends UriEndpointComponent { this.connectOptions = connectOptions; } - public String getHeaderType() { - return headerType; - } - - /** - * Exchange header type. - */ - public void setHeaderType(String headerType) { - this.headerType = headerType; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java index 89d9994..4c8d1ff 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java @@ -18,8 +18,9 @@ package org.apache.camel.component.paho; public final class PahoConstants { - public static final String HEASER_MQTT_PROPERTIES = "MqttProperties"; + public static final String MQTT_TOPIC = "CamelMqttTopic"; + @Deprecated public static final String HEADER_ORIGINAL_MESSAGE = "PahoOriginalMessage"; private PahoConstants() { http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java index 82b6c5f..75d6092 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java @@ -48,26 +48,8 @@ public class PahoConsumer extends DefaultConsumer { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { - String headerKey; - Object headerValue; - String headerType = getEndpoint().getHeaderType(); - if (PahoConstants.HEADER_ORIGINAL_MESSAGE.equals(headerType)) { - headerKey = PahoConstants.HEADER_ORIGINAL_MESSAGE; - headerValue = message; - } else { - MqttProperties props = new MqttProperties(); - props.setTopic(topic); - props.setQos(message.getQos()); - props.setRetain(message.isRetained()); - props.setDuplicate(message.isDuplicate()); - - headerKey = PahoConstants.HEASER_MQTT_PROPERTIES; - headerValue = props; - } - - Exchange exchange = getEndpoint().createExchange(); - exchange.getIn().setBody(message.getPayload()); - exchange.getIn().setHeader(headerKey, headerValue); + LOG.debug("Message arrived on topic: {} -> {}", topic, message); + Exchange exchange = getEndpoint().createExchange(message, topic); getAsyncProcessor().process(exchange, new AsyncCallback() { @Override @@ -79,7 +61,7 @@ public class PahoConsumer extends DefaultConsumer { @Override public void deliveryComplete(IMqttDeliveryToken token) { - LOG.debug("Delivery complete. Token: {}.", token); + LOG.debug("Delivery complete. Token: {}", token); } }); } http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java index 0562c9a..97dad4d 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java @@ -18,10 +18,11 @@ package org.apache.camel.component.paho; import java.util.Set; -import static java.lang.System.nanoTime; +import javax.xml.xpath.XPathConstants; import org.apache.camel.Component; import org.apache.camel.Consumer; +import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; @@ -32,11 +33,13 @@ import org.apache.camel.spi.UriPath; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttClientPersistence; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.lang.System.nanoTime; import static org.apache.camel.component.paho.PahoPersistence.MEMORY; @UriEndpoint(scheme = "paho", title = "Paho", consumerClass = PahoConsumer.class, label = "messaging", syntax = "paho:topic") @@ -45,15 +48,11 @@ public class PahoEndpoint extends DefaultEndpoint { private static final Logger LOG = LoggerFactory.getLogger(PahoEndpoint.class); // Constants - private static final String DEFAULT_BROKER_URL = "tcp://localhost:1883"; - private static final int DEFAULT_QOS = 2; - private static final String DEFAULT_QOS_STRING = DEFAULT_QOS + ""; // Configuration members - @UriPath @Metadata(required = "true") private String topic; @UriParam @@ -64,8 +63,6 @@ public class PahoEndpoint extends DefaultEndpoint { private int qos = DEFAULT_QOS; @UriParam(defaultValue = "MEMORY") private PahoPersistence persistence = MEMORY; - @UriParam(defaultValue = PahoConstants.HEASER_MQTT_PROPERTIES) - private String headerType = PahoConstants.HEASER_MQTT_PROPERTIES; // Collaboration members @UriParam @@ -144,6 +141,17 @@ public class PahoEndpoint extends DefaultEndpoint { return new MqttConnectOptions(); } + public Exchange createExchange(MqttMessage mqttMessage, String topic) { + PahoMessage paho = new PahoMessage(); + paho.setMqttMessage(mqttMessage); + paho.setBody(mqttMessage.getPayload()); + paho.setHeader(PahoConstants.MQTT_TOPIC, topic); + + Exchange exchange = createExchange(); + exchange.setIn(paho); + return exchange; + } + // Configuration getters & setters public String getClientId() { @@ -225,15 +233,4 @@ public class PahoEndpoint extends DefaultEndpoint { this.connectOptions = connOpts; } - public String getHeaderType() { - return headerType; - } - - /** - * Exchange header type. - */ - public void setHeaderType(String headerType) { - this.headerType = headerType; - } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoMessage.java ---------------------------------------------------------------------- diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoMessage.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoMessage.java new file mode 100644 index 0000000..a89e470 --- /dev/null +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoMessage.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.paho; + +import org.apache.camel.impl.DefaultMessage; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +public class PahoMessage extends DefaultMessage { + + private transient MqttMessage mqttMessage; + + public PahoMessage() { + } + + public PahoMessage(MqttMessage mqttMessage) { + this.mqttMessage = mqttMessage; + } + + public MqttMessage getMqttMessage() { + return mqttMessage; + } + + public void setMqttMessage(MqttMessage mqttMessage) { + this.mqttMessage = mqttMessage; + } + + @Override + public PahoMessage newInstance() { + return new PahoMessage(mqttMessage); + } + + +} http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java index 14e1515..8013f8f 100644 --- a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java +++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java @@ -37,9 +37,6 @@ public class PahoComponentTest extends CamelTestSupport { @EndpointInject(uri = "mock:test") MockEndpoint mock; - @EndpointInject(uri = "mock:test2") - MockEndpoint mock2; - BrokerService broker; int mqttPort = AvailablePortFinder.getNextAvailable(); @@ -73,7 +70,6 @@ public class PahoComponentTest extends CamelTestSupport { from("paho:queue?brokerUrl=tcp://localhost:" + mqttPort).to("mock:test"); from("direct:test2").to("paho:queue?brokerUrl=tcp://localhost:" + mqttPort); - from("paho:queue?headerType=PahoOriginalMessage&brokerUrl=tcp://localhost:" + mqttPort).to("mock:test2"); from("paho:persistenceTest?persistence=FILE&brokerUrl=tcp://localhost:" + mqttPort).to("mock:persistenceTest"); @@ -97,9 +93,8 @@ public class PahoComponentTest extends CamelTestSupport { + "?clientId=sampleClient" + "&brokerUrl=tcp://localhost:" + mqttPort + "&qos=2" - + "&persistence=file" - + "&headerType=testType"; - + + "&persistence=file"; + PahoEndpoint endpoint = getMandatoryEndpoint(uri, PahoEndpoint.class); // Then @@ -108,7 +103,6 @@ public class PahoComponentTest extends CamelTestSupport { assertEquals("tcp://localhost:" + mqttPort, endpoint.getBrokerUrl()); assertEquals(2, endpoint.getQos()); assertEquals(PahoPersistence.FILE, endpoint.getPersistence()); - assertEquals("testType", endpoint.getHeaderType()); } @Test @@ -169,28 +163,29 @@ public class PahoComponentTest extends CamelTestSupport { // Then mock.assertIsSatisfied(); + Exchange exchange = mock.getExchanges().get(0); - MqttProperties mqttProperties = exchange.getIn().getHeader(PahoConstants.HEASER_MQTT_PROPERTIES, - MqttProperties.class); String payload = new String((byte[]) exchange.getIn().getBody(), "utf-8"); - assertEquals("queue", new String(mqttProperties.getTopic())); - assertEquals(msg, new String(payload)); + assertEquals("queue", exchange.getIn().getHeader(PahoConstants.MQTT_TOPIC)); + assertEquals(msg, payload); } @Test public void shouldKeepOriginalMessageInHeader() throws InterruptedException { // Given final String msg = "msg"; - mock2.expectedBodiesReceived(msg); + mock.expectedBodiesReceived(msg); // When template.sendBody("direct:test2", msg); // Then - mock2.assertIsSatisfied(); - Exchange exchange = mock2.getExchanges().get(0); - MqttMessage message = exchange.getIn().getHeader(PahoConstants.HEADER_ORIGINAL_MESSAGE, MqttMessage.class); + mock.assertIsSatisfied(); + Exchange exchange = mock.getExchanges().get(0); + + MqttMessage message = exchange.getIn(PahoMessage.class).getMqttMessage(); + assertNotNull(message); assertEquals(msg, new String(message.getPayload())); }