camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acosent...@apache.org
Subject [camel] 03/11: Add camel-nsq component.
Date Thu, 08 Nov 2018 08:39:45 GMT
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9eea1616d04806fbbb48d905dadc12de33f3f7b0
Author: mionker <mionker@icloud.com>
AuthorDate: Fri Nov 2 21:58:37 2018 +0100

    Add camel-nsq component.
---
 .../camel-nsq/src/main/docs/nsq-component.adoc     | 129 +++++++++++++++++++++
 .../apache/camel/component/nsq/NsqComponent.java   |  24 +++-
 .../camel/component/nsq/NsqConfiguration.java      |  15 ++-
 .../apache/camel/component/nsq/NsqConsumer.java    |   2 +-
 .../apache/camel/component/nsq/NsqEndpoint.java    |  13 +++
 .../apache/camel/component/nsq/NsqProducer.java    |   4 +-
 6 files changed, 179 insertions(+), 8 deletions(-)

diff --git a/components/camel-nsq/src/main/docs/nsq-component.adoc b/components/camel-nsq/src/main/docs/nsq-component.adoc
new file mode 100644
index 0000000..39bed9f
--- /dev/null
+++ b/components/camel-nsq/src/main/docs/nsq-component.adoc
@@ -0,0 +1,129 @@
+[[nsq-component]]
+== nsq Component
+
+
+http://nsq.io/[NSQ] is a realtime distributed messaging platform.
+
+Maven users will need to add the following dependency to
+their `pom.xml` for this component.
+
+[source,xml]
+------------------------------------------------------------
+<dependency>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>camel-nsq</artifactId>
+    <!-- use the same version as your Camel core version -->
+    <version>x.y.z</version>
+</dependency>
+------------------------------------------------------------
+
+### URI format
+
+[source,java]
+----------------------
+nsq:servers[?options]
+----------------------
+
+Where *servers* represents the list of NSQ servers - nsqlookupd servers in the case of
a consumer and nsqd servers in the case of a producer.
+
+### Options
+
+
+// component options: START
+The Nsq component supports 2 options, which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *useGlobalSslContext Parameters* (security) | Enable usage of global SSL context parameters.
| false | boolean
+| *resolveProperty Placeholders* (advanced) | Whether the component should resolve property
placeholders on itself when starting. Only properties which are of String type can use property
placeholders. | true | boolean
+|===
+// component options: END
+
+
+
+
+
+// endpoint options: START
+The NSQ endpoint is configured using URI syntax:
+
+----
+nsq:servers
+----
+
+with the following path and query parameters:
+
+==== Path Parameters (1 parameters):
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *servers* | *Required* URLs to one or more nsqlookupq (consumer) or nsqd (producer) servers.
Use comma to separate URLs when specifying multiple servers. The port can also be specified
using <hostname>:<port>. |  | String
+|===
+
+
+==== Query Parameters (25 parameters):
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *topic* (common) | *Required* The name of topic we want to use |  | String
+| *channel* (consumer) | *Required* The name of channel we want to use |  | String
+| *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing
Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming
messages, or the likes, will now be processed as a message and handled by the routing Error
Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal
with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
+| *poolSize* (consumer) | Consumer pool size | 10 | int
+| *lookupServerPort* (consumer) | The default port number to use to connect to nsqlookupd
server | 4161 | int
+| *port* (producer) | The default port number to use to connect to nsqd server | 4150 | int
+| *lookupInterval* (consumer) | The retry interval in millicseconds to lookup the topic at
the nsqlookupq server. | 5000 | int
+| *requeueInterval* (consumer) | The requeue interval. Default of 0 will
+
+|===
+// endpoint options: END
+// spring-boot-auto-configure options: START
+=== Spring Boot Auto-Configuration
+
+// spring-boot-auto-configure options: END
+
+
+
+
+
+### Headers
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Name |Type |Description
+
+|CamelNatsMessageTimestamp |long |The timestamp of a consumed message.
+|=======================================================================
+ 
+*Producer example:*
+
+[source,java]
+-----------------------------------------------------------
+from("direct:send").to("nats://localhost:4222?topic=test");
+-----------------------------------------------------------
+
+In case of using Authorization you can directly specify your credentials in the server URL
+
+[source,java]
+-----------------------------------------------------------
+from("direct:send").to("nats://username:password@localhost:4222?topic=test");
+-----------------------------------------------------------
+
+or your token
+
+[source,java]
+-----------------------------------------------------------
+from("direct:send").to("nats://token@localhost:4222?topic=test");
+-----------------------------------------------------------
+
+*Consumer example:*
+
+[source,java]
+----------------------------------------------------------------------------------------
+from("nats://localhost:4222?topic=test&maxMessages=5&queueName=test").to("mock:result");
+----------------------------------------------------------------------------------------
diff --git a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqComponent.java
b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqComponent.java
index 6c92fd4..eda6413 100644
--- a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqComponent.java
+++ b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqComponent.java
@@ -1,17 +1,20 @@
 package org.apache.camel.component.nsq;
 
 import org.apache.camel.Endpoint;
+import org.apache.camel.SSLContextParametersAware;
 import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.spi.Metadata;
 
 import java.util.Map;
 
-import static org.apache.camel.util.IntrospectionSupport.setProperties;
-
 /**
  * Represents the component that manages {@link NsqEndpoint}.
  */
-public class NsqComponent extends DefaultComponent {
-    
+public class NsqComponent extends DefaultComponent implements SSLContextParametersAware {
+
+    @Metadata(label = "security", defaultValue = "false")
+    private boolean useGlobalSslContextParameters;
+
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object>
parameters) throws Exception {
         NsqConfiguration config = new NsqConfiguration();
         setProperties(config, parameters);
@@ -20,4 +23,17 @@ public class NsqComponent extends DefaultComponent {
         NsqEndpoint endpoint = new NsqEndpoint(uri, this, config);
         return endpoint;
     }
+
+    @Override
+    public boolean isUseGlobalSslContextParameters() {
+        return this.useGlobalSslContextParameters;
+    }
+
+    /**
+     * Enable usage of global SSL context parameters.
+     */
+    @Override
+    public void setUseGlobalSslContextParameters(boolean useGlobalSslContextParameters) {
+        this.useGlobalSslContextParameters = useGlobalSslContextParameters;
+    }
 }
diff --git a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConfiguration.java
b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConfiguration.java
index 953c7d7..2c01f99 100644
--- a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConfiguration.java
+++ b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConfiguration.java
@@ -6,6 +6,7 @@ import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.camel.spi.UriPath;
+import org.apache.camel.util.jsse.SSLContextParameters;
 
 import java.util.Set;
 
@@ -33,6 +34,8 @@ public class NsqConfiguration {
     private long lookupInterval = 5000;
     @UriParam(label = "consumer", defaultValue = "0", description = "The requeue interval")
     private long requeueInterval = 0;
+    @UriParam(label = "security")
+    private SSLContextParameters sslContextParameters;
 
     /*
      * URL a NSQ lookup server hostname.
@@ -137,7 +140,7 @@ public class NsqConfiguration {
     }
 
     /**
-     * The requeue retry interval
+     * The requeue interval
      */
     public long getRequeueInterval() {
         return requeueInterval;
@@ -147,6 +150,16 @@ public class NsqConfiguration {
         this.requeueInterval = requeueInterval;
     }
 
+    /**
+     * To configure security using SSLContextParameters
+     */
+    public SSLContextParameters getSslContextParameters() {
+        return sslContextParameters;
+    }
+
+    public void setSslContextParameters(SSLContextParameters sslContextParameters) {
+        this.sslContextParameters = sslContextParameters;
+    }
 
     private String splitServers() {
         StringBuilder servers = new StringBuilder();
diff --git a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
index f41a888..6402fe5 100644
--- a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
+++ b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
@@ -85,7 +85,7 @@ public class NsqConsumer extends DefaultConsumer {
             public void message(NSQMessage msg) {
                 LOG.debug("Received Message: {}", msg);
                 Exchange exchange = getEndpoint().createExchange();
-                exchange.getIn().setBody(new String(msg.getMessage()));
+                exchange.getIn().setBody(msg.getMessage());
                 exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_ID, msg.getId());
                 exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_ATTEMPTS, msg.getAttempts());
                 exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_TIMESTAMP, msg.getTimestamp());
diff --git a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqEndpoint.java
b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqEndpoint.java
index 1b25880..e792316 100644
--- a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqEndpoint.java
+++ b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqEndpoint.java
@@ -1,5 +1,8 @@
 package org.apache.camel.component.nsq;
 
+import com.github.brainlag.nsq.NSQConfig;
+import io.netty.handler.ssl.JdkSslContext;
+import io.netty.handler.ssl.SslContext;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -7,6 +10,8 @@ import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 
+import java.io.IOException;
+import java.security.GeneralSecurityException;
 import java.util.concurrent.ExecutorService;
 
 /**
@@ -42,4 +47,12 @@ public class NsqEndpoint extends DefaultEndpoint {
     public NsqConfiguration getNsqConfiguration() {
         return configuration;
     }
+
+    public NSQConfig getNsqConfig() throws GeneralSecurityException, IOException {
+        NSQConfig nsqConfig = new NSQConfig();
+        SslContext sslContext = new JdkSslContext(getNsqConfiguration().getSslContextParameters().createSSLContext(getCamelContext()),
true, null);
+        nsqConfig.setSslContext(sslContext);
+
+        return nsqConfig;
+    }
 }
diff --git a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqProducer.java
b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqProducer.java
index a886eae..630a2fa 100644
--- a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqProducer.java
+++ b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqProducer.java
@@ -33,8 +33,8 @@ public class NsqProducer extends DefaultProducer {
 
         LOG.debug("Publishing to topic: {}", topic);
 
-        String body = exchange.getIn().getBody(String.class);
-        producer.produce(topic, body.getBytes());
+        byte[] body = exchange.getIn().getBody(byte[].class);
+        producer.produce(topic, body);
     }
 
     @Override


Mime
View raw message