pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: [pulsar-client-tools] Add support for websocket produce/consume command (#3835)
Date Tue, 19 Mar 2019 17:37:34 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bc0d26  [pulsar-client-tools] Add support for websocket produce/consume command
(#3835)
9bc0d26 is described below

commit 9bc0d268608bce7957dbc9a3207c90ccae697c39
Author: Rajan Dhabalia <rdhabalia@apache.org>
AuthorDate: Tue Mar 19 10:37:29 2019 -0700

    [pulsar-client-tools] Add support for websocket produce/consume command (#3835)
    
    * [pulsar-client-tools] Add support for websocket produce/consume command
    
    * remove comments
---
 pulsar-client-tools/pom.xml                        |   5 +
 .../org/apache/pulsar/client/cli/CmdConsume.java   | 189 ++++++++++++++++++++-
 .../org/apache/pulsar/client/cli/CmdProduce.java   | 188 +++++++++++++++++++-
 .../apache/pulsar/client/cli/PulsarClientTool.java |  10 +-
 4 files changed, 380 insertions(+), 12 deletions(-)

diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml
index 072657b..8a3ac94 100644
--- a/pulsar-client-tools/pom.xml
+++ b/pulsar-client-tools/pom.xml
@@ -66,6 +66,11 @@
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-websocket</artifactId>
+      <version>${project.version}</version>
+    </dependency>
 
     <!-- functions related dependencies (begin) -->
     <dependency>
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index f19e6a5..4f86556 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -20,17 +20,37 @@ package org.apache.pulsar.client.cli;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.HexDump;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +58,9 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import com.google.common.util.concurrent.RateLimiter;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
 
 /**
  * pulsar-client consume command implementation.
@@ -68,8 +91,10 @@ public class CmdConsume {
     @Parameter(names = { "-r", "--rate" }, description = "Rate (in msg/sec) at which to consume,
"
             + "value 0 means to consume messages as fast as possible.")
     private double consumeRate = 0;
-
-    ClientBuilder clientBuilder;
+    
+    private ClientBuilder clientBuilder;
+    private Authentication authentication;
+    private String serviceURL;
 
     public CmdConsume() {
         // Do nothing
@@ -79,8 +104,10 @@ public class CmdConsume {
      * Set client configuration.
      *
      */
-    public void updateConfig(ClientBuilder clientBuilder) {
+    public void updateConfig(ClientBuilder clientBuilder, Authentication authentication,
String serviceURL) {
         this.clientBuilder = clientBuilder;
+        this.authentication = authentication;
+        this.serviceURL = serviceURL;
     }
 
     /**
@@ -117,12 +144,22 @@ public class CmdConsume {
             throw (new ParameterException("Number of messages should be zero or positive."));
 
         String topic = this.mainOptions.get(0);
+        
+        if(this.serviceURL.startsWith("ws")) {
+            return consumeFromWebSocket(topic);
+        }else {
+            return consume(topic);
+        }
+    }
+
+    private int consume(String topic) {
         int numMessagesConsumed = 0;
         int returnCode = 0;
 
         try {
             PulsarClient client = clientBuilder.build();
-            Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(this.subscriptionName).subscriptionType(subscriptionType).subscribe();
+            Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(this.subscriptionName)
+                    .subscriptionType(subscriptionType).subscribe();
 
             RateLimiter limiter = (this.consumeRate > 0) ? RateLimiter.create(this.consumeRate)
: null;
             while (this.numMessagesToConsume == 0 || numMessagesConsumed < this.numMessagesToConsume)
{
@@ -151,5 +188,149 @@ public class CmdConsume {
         }
 
         return returnCode;
+
+    }
+
+    @SuppressWarnings("deprecation")
+    private int consumeFromWebSocket(String topic) {
+        int numMessagesConsumed = 0;
+        int returnCode = 0;
+
+        TopicName topicName = TopicName.get(topic);
+        
+        String wsTopic = String.format(
+                "%s/%s/" + (StringUtils.isEmpty(topicName.getCluster()) ? "" : topicName.getCluster()
+ "/")
+                        + "%s/%s/%s?subscriptionType=%s",
+                topicName.getDomain(), topicName.getTenant(), topicName.getNamespacePortion(),
topicName.getLocalName(),
+                subscriptionName, subscriptionType.toString());
+        
+        String consumerBaseUri = serviceURL + (serviceURL.endsWith("/") ? "" : "/") + "ws/consumer/"
+ wsTopic;
+        URI consumerUri = URI.create(consumerBaseUri);
+
+        WebSocketClient produceClient = new WebSocketClient(new SslContextFactory(true));
+        ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+        try {
+            if (authentication != null) {
+                authentication.start();
+                AuthenticationDataProvider authData = authentication.getAuthData();
+                if (authData.hasDataForHttp()) {
+                    for (Map.Entry<String, String> kv : authData.getHttpHeaders())
{
+                        produceRequest.setHeader(kv.getKey(), kv.getValue());
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Authentication plugin error: " + e.getMessage());
+            return -1;
+        }
+        CompletableFuture<Void> connected = new CompletableFuture<>();
+        ConsumerSocket consumerSocket = new ConsumerSocket(connected);
+        try {
+            produceClient.start();
+        } catch (Exception e) {
+            LOG.error("Failed to start websocket-client", e);
+            return -1;
+        }
+
+        try {
+            LOG.info("Trying to create websocket session..{}",consumerUri);
+            produceClient.connect(consumerSocket, consumerUri, produceRequest);
+            connected.get();
+        } catch (Exception e) {
+            LOG.error("Failed to create web-socket session", e);
+            return -1;
+        }
+
+        try {
+            RateLimiter limiter = (this.consumeRate > 0) ? RateLimiter.create(this.consumeRate)
: null;
+            while (this.numMessagesToConsume == 0 || numMessagesConsumed < this.numMessagesToConsume)
{
+                if (limiter != null) {
+                    limiter.acquire();
+                }
+                String msg = consumerSocket.receive(5, TimeUnit.SECONDS);
+                if (msg == null) {
+                    LOG.debug("No message to consume after waiting for 5 seconds.");
+                } else {
+                    try {
+                        System.out.println(Base64.getDecoder().decode(msg));    
+                    }catch(Exception e) {
+                        System.out.println(msg);
+                    }
+                    numMessagesConsumed += 1;
+                }
+            }
+            consumerSocket.awaitClose(2, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            LOG.error("Error while consuming messages");
+            LOG.error(e.getMessage(), e);
+            returnCode = -1;
+        } finally {
+            LOG.info("{} messages successfully consumed", numMessagesConsumed);
+        }
+
+        return returnCode;
+    }
+
+    @WebSocket(maxTextMessageSize = 64 * 1024)
+    public static class ConsumerSocket {
+        private static final String X_PULSAR_MESSAGE_ID = "messageId";
+        private final CountDownLatch closeLatch;
+        private Session session;
+        private CompletableFuture<Void> connected;
+        final BlockingQueue<String> incomingMessages;
+
+        public ConsumerSocket(CompletableFuture<Void> connected) {
+            this.closeLatch = new CountDownLatch(1);
+            this.connected = connected;
+            this.incomingMessages = new GrowableArrayBlockingQueue<>();
+        }
+
+        public boolean awaitClose(int duration, TimeUnit unit) throws InterruptedException
{
+            return this.closeLatch.await(duration, unit);
+        }
+
+        @OnWebSocketClose
+        public void onClose(int statusCode, String reason) {
+            log.info("Connection closed: {} - {}", statusCode, reason);
+            this.session = null;
+            this.closeLatch.countDown();
+        }
+
+        @OnWebSocketConnect
+        public void onConnect(Session session) throws InterruptedException {
+            log.info("Got connect: {}", session);
+            this.session = session;
+            this.connected.complete(null);
+        }
+
+        @OnWebSocketMessage
+        public synchronized void onMessage(String msg) throws Exception {
+            JsonObject message = new Gson().fromJson(msg, JsonObject.class);
+            JsonObject ack = new JsonObject();
+            String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString();
+            ack.add("messageId", new JsonPrimitive(messageId));
+            // Acking the proxy
+            this.getRemote().sendString(ack.toString());
+            this.incomingMessages.put(msg);
+        }
+
+        public String receive(long timeout, TimeUnit unit) throws Exception {
+            return incomingMessages.poll(timeout, unit);
+        }
+
+        public RemoteEndpoint getRemote() {
+            return this.session.getRemote();
+        }
+
+        public Session getSession() {
+            return this.session;
+        }
+
+        public void close() {
+            this.session.close();
+        }
+
+        private static final Logger log = LoggerFactory.getLogger(ConsumerSocket.class);
+
     }
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index c1eba86..990ac25 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -21,18 +21,44 @@ package org.apache.pulsar.client.cli;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.RateLimiter;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
 
+import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.websocket.data.ProducerMessage;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +91,9 @@ public class CmdProduce {
             + "value 0 means to produce messages as fast as possible.")
     private double publishRate = 0;
 
-    ClientBuilder clientBuilder;
+    private ClientBuilder clientBuilder;
+    private Authentication authentication;
+    private String serviceURL;
 
     public CmdProduce() {
         // Do nothing
@@ -75,8 +103,10 @@ public class CmdProduce {
      * Set Pulsar client configuration.
      *
      */
-    public void updateConfig(ClientBuilder newBuilder) {
+    public void updateConfig(ClientBuilder newBuilder, Authentication authentication, String
serviceURL) {
         this.clientBuilder = newBuilder;
+        this.authentication = authentication;
+        this.serviceURL = serviceURL;
     }
 
     /*
@@ -114,12 +144,15 @@ public class CmdProduce {
      * @throws Exception
      */
     public int run() throws PulsarClientException {
-        if (mainOptions.size() != 1)
+        if (mainOptions.size() != 1) {
             throw (new ParameterException("Please provide one and only one topic name."));
-        if (this.numTimesProduce <= 0)
+        }
+        if (this.numTimesProduce <= 0) {
             throw (new ParameterException("Number of times need to be positive number."));
-        if (messages.size() == 0 && messageFileNames.size() == 0)
+        }
+        if (messages.size() == 0 && messageFileNames.size() == 0) {
             throw (new ParameterException("Please supply message content with either --messages
or --files"));
+        }
 
         int totalMessages = (messages.size() + messageFileNames.size()) * numTimesProduce;
         if (totalMessages > MAX_MESSAGES) {
@@ -129,6 +162,15 @@ public class CmdProduce {
         }
 
         String topic = this.mainOptions.get(0);
+
+        if (this.serviceURL.startsWith("ws")) {
+            return publishToWebSocket(totalMessages, topic);
+        } else {
+            return publish(totalMessages, topic);
+        }
+    }
+
+    private int publish(int totalMessages, String topic) {
         int numMessagesSent = 0;
         int returnCode = 0;
 
@@ -159,4 +201,140 @@ public class CmdProduce {
 
         return returnCode;
     }
+
+    @SuppressWarnings("deprecation")
+    private int publishToWebSocket(int totalMessages, String topic) {
+        int numMessagesSent = 0;
+        int returnCode = 0;
+
+        TopicName topicName = TopicName.get(topic);
+        String wsTopic = String.format("%s/%s/"+(StringUtils.isEmpty(topicName.getCluster())
? "" : topicName.getCluster()+"/")+"%s/%s", topicName.getDomain(),topicName.getTenant(),topicName.getNamespacePortion(),topicName.getLocalName());

+        String produceBaseEndPoint = serviceURL + (serviceURL.endsWith("/") ? "" : "/") +
"ws/producer/" + wsTopic;
+        URI produceUri = URI.create(produceBaseEndPoint);
+
+        WebSocketClient produceClient = new WebSocketClient(new SslContextFactory(true));
+        ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+        try {
+            if (authentication != null) {
+                authentication.start();
+                AuthenticationDataProvider authData = authentication.getAuthData();
+                if (authData.hasDataForHttp()) {
+                    for (Map.Entry<String, String> kv : authData.getHttpHeaders())
{
+                        produceRequest.setHeader(kv.getKey(), kv.getValue());
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Authentication plugin error: " + e.getMessage());
+            return -1;
+        }
+
+        CompletableFuture<Void> connected = new CompletableFuture<>();
+        ProducerSocket produceSocket = new ProducerSocket(connected);
+        try {
+            produceClient.start();
+        } catch (Exception e) {
+            LOG.error("Failed to start websocket-client", e);
+            return -1;
+        }
+
+        try {
+            LOG.info("Trying to create websocket session.. on {},{}", produceUri, produceRequest);
+            produceClient.connect(produceSocket, produceUri, produceRequest);
+            connected.get();
+        } catch (Exception e) {
+            LOG.error("Failed to create web-socket session", e);
+            return -1;
+        }
+
+        try {
+            List<byte[]> messageBodies = generateMessageBodies(this.messages, this.messageFileNames);
+            RateLimiter limiter = (this.publishRate > 0) ? RateLimiter.create(this.publishRate)
: null;
+            for (int i = 0; i < this.numTimesProduce; i++) {
+                int index = i * 10;
+                for (byte[] content : messageBodies) {
+                    if (limiter != null) {
+                        limiter.acquire();
+                    }
+                    produceSocket.send(index++, content).get(30,TimeUnit.SECONDS);
+                    numMessagesSent++;
+                }
+            }
+            produceSocket.close();
+        } catch (Exception e) {
+            LOG.error("Error while producing messages");
+            LOG.error(e.getMessage(), e);
+            returnCode = -1;
+        } finally {
+            LOG.info("{} messages successfully produced", numMessagesSent);
+        }
+
+        return returnCode;
+    }
+
+    @WebSocket(maxTextMessageSize = 64 * 1024)
+    public static class ProducerSocket {
+
+        private final CountDownLatch closeLatch;
+        private Session session;
+        private CompletableFuture<Void> connected;
+        private volatile CompletableFuture<Void> result;
+
+        public ProducerSocket(CompletableFuture<Void> connected) {
+            this.closeLatch = new CountDownLatch(1);
+            this.connected = connected;
+        }
+
+        public CompletableFuture<Void> send(int index, byte[] content) throws Exception
{
+            this.session.getRemote().sendString(getTestJsonPayload(index, content));
+            this.result = new CompletableFuture<>();
+            return result;
+        }
+
+        private static String getTestJsonPayload(int index, byte[] content) throws JsonProcessingException
{
+            ProducerMessage msg = new ProducerMessage();
+            msg.payload = Base64.getEncoder().encodeToString(content);
+            msg.key = Integer.toString(index);
+            return ObjectMapperFactory.getThreadLocal().writeValueAsString(msg);
+        }
+
+        public boolean awaitClose(int duration, TimeUnit unit) throws InterruptedException
{
+            return this.closeLatch.await(duration, unit);
+        }
+
+        @OnWebSocketClose
+        public void onClose(int statusCode, String reason) {
+            LOG.info("Connection closed: {} - {}", statusCode, reason);
+            this.session = null;
+            this.closeLatch.countDown();
+        }
+
+        @OnWebSocketConnect
+        public void onConnect(Session session) throws Exception {
+            LOG.info("Got connect: {}", session);
+            this.session = session;
+            this.connected.complete(null);
+        }
+
+        @OnWebSocketMessage
+        public synchronized void onMessage(String msg) throws JsonParseException {
+            LOG.info("ack= {}",msg);
+            if(this.result!=null) {
+                this.result.complete(null);
+            }
+        }
+
+        public RemoteEndpoint getRemote() {
+            return this.session.getRemote();
+        }
+
+        public Session getSession() {
+            return this.session;
+        }
+
+        public void close() {
+            this.session.close();
+        }
+
+    }
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
index 95f0d51..122f89d 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
@@ -27,6 +27,8 @@ import java.util.Arrays;
 import java.util.Properties;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
@@ -90,14 +92,16 @@ public class PulsarClientTool {
 
     private void updateConfig() throws UnsupportedAuthenticationException, MalformedURLException
{
         ClientBuilder clientBuilder = PulsarClient.builder();
+        Authentication authentication = null;
         if (isNotBlank(this.authPluginClassName)) {
-            clientBuilder.authentication(authPluginClassName, authParams);
+            authentication = AuthenticationFactory.create(authPluginClassName, authParams);
+            clientBuilder.authentication(authentication);
         }
         clientBuilder.allowTlsInsecureConnection(this.tlsAllowInsecureConnection);
         clientBuilder.tlsTrustCertsFilePath(this.tlsTrustCertsFilePath);
         clientBuilder.serviceUrl(serviceURL);
-        this.produceCommand.updateConfig(clientBuilder);
-        this.consumeCommand.updateConfig(clientBuilder);
+        this.produceCommand.updateConfig(clientBuilder, authentication, this.serviceURL);
+        this.consumeCommand.updateConfig(clientBuilder, authentication, this.serviceURL);
     }
 
     public int run(String[] args) {


Mime
View raw message