pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [incubator-pulsar] branch master updated: Reintroduce Aerospike connector (#2524)
Date Thu, 06 Sep 2018 14:39:40 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ecd03a  Reintroduce Aerospike connector (#2524)
2ecd03a is described below

commit 2ecd03abbbf6128d59d27a9d151ed47cc7ead4c7
Author: Ali Ahmed <alahmed.se@gmail.com>
AuthorDate: Thu Sep 6 07:39:37 2018 -0700

    Reintroduce Aerospike connector (#2524)
---
 distribution/io/src/assemble/io.xml                |   8 +
 pom.xml                                            |   1 +
 pulsar-io/{ => aerospike}/pom.xml                  |  56 +++++--
 .../pulsar/io/aerospike/AerospikeAbstractSink.java | 169 +++++++++++++++++++++
 .../pulsar/io/aerospike/AerospikeSinkConfig.java   |  64 ++++++++
 .../pulsar/io/aerospike/AerospikeStringSink.java   |  35 +++++
 .../resources/META-INF/services/pulsar-io.yaml     |  22 +++
 pulsar-io/pom.xml                                  |   1 +
 site2/docs/deploy-bare-metal.md                    |   1 +
 site2/docs/getting-started-standalone.md           |   1 +
 site2/docs/io-aerospike.md                         |  21 +++
 site2/docs/io-connectors.md                        |   1 +
 site2/docs/io-overview.md                          |   3 +-
 site2/docs/io-quickstart.md                        |   3 +-
 14 files changed, 368 insertions(+), 18 deletions(-)

diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml
index bb75e84..08ff859 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -74,15 +74,23 @@
       <outputDirectory>connectors</outputDirectory>
       <fileMode>644</fileMode>
     </file>
+
     <file>
       <source>${basedir}/../../pulsar-io/jdbc/target/pulsar-io-jdbc-${project.version}.nar</source>
       <outputDirectory>connectors</outputDirectory>
       <fileMode>644</fileMode>
     </file>
+
     <file>
       <source>${basedir}/../../pulsar-io/data-genenator/target/pulsar-io-data-generator-${project.version}.nar</source>
       <outputDirectory>connectors</outputDirectory>
       <fileMode>644</fileMode>
     </file>
+
+    <file>
+      <source>${basedir}/../../pulsar-io/aerospike/target/pulsar-io-aerospike-${project.version}.nar</source>
+      <outputDirectory>connectors</outputDirectory>
+      <fileMode>644</fileMode>
+    </file>
   </files>
 </assembly>
diff --git a/pom.xml b/pom.xml
index 681f18e..a35ca81 100644
--- a/pom.xml
+++ b/pom.xml
@@ -162,6 +162,7 @@ flexible messaging model and an intuitive client API.</description>
     <sketches.version>0.8.3</sketches.version>
     <hbc-core.version>2.2.0</hbc-core.version>
     <cassandra-driver-core.version>3.4.0</cassandra-driver-core.version>
+    <aerospike-client.version>4.1.11</aerospike-client.version>
     <kafka-client.version>0.10.2.1</kafka-client.version>
     <rabbitmq-client.version>5.1.1</rabbitmq-client.version>
     <aws-sdk.version>1.11.297</aws-sdk.version>
diff --git a/pulsar-io/pom.xml b/pulsar-io/aerospike/pom.xml
similarity index 50%
copy from pulsar-io/pom.xml
copy to pulsar-io/aerospike/pom.xml
index e89cc02..34b2d75 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/aerospike/pom.xml
@@ -19,27 +19,51 @@
 
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <packaging>pom</packaging>
   <parent>
     <groupId>org.apache.pulsar</groupId>
-    <artifactId>pulsar</artifactId>
+    <artifactId>pulsar-io</artifactId>
     <version>2.2.0-incubating-SNAPSHOT</version>
   </parent>
 
-  <artifactId>pulsar-io</artifactId>
-  <name>Pulsar IO :: Parent</name>
-
-  <modules>
-    <module>core</module>
-    <module>twitter</module>
-    <module>cassandra</module>
-    <module>kafka</module>
-    <module>rabbitmq</module>
-    <module>kinesis</module>
-    <module>jdbc</module>
-    <module>data-genenator</module>
-  </modules>
+  <artifactId>pulsar-io-aerospike</artifactId>
+  <name>Pulsar IO :: Aerospike</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.aerospike</groupId>
+      <artifactId>aerospike-client-bc</artifactId>
+      <version>${aerospike-client.version}</version>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
 
 </project>
diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
new file mode 100644
index 0000000..fe3787a
--- /dev/null
+++ b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.aerospike;
+
+import com.aerospike.client.AerospikeClient;
+import com.aerospike.client.AerospikeException;
+import com.aerospike.client.Bin;
+import com.aerospike.client.Host;
+import com.aerospike.client.Key;
+import com.aerospike.client.Value;
+import com.aerospike.client.async.EventLoop;
+import com.aerospike.client.async.EventPolicy;
+import com.aerospike.client.async.NioEventLoops;
+import com.aerospike.client.listener.WriteListener;
+import com.aerospike.client.policy.ClientPolicy;
+import com.aerospike.client.policy.WritePolicy;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Simple abstract class for Aerospike sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+public abstract class AerospikeAbstractSink<K, V> implements Sink<byte[]> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AerospikeAbstractSink.class);
+
+    // ----- Runtime fields
+    private AerospikeSinkConfig aerospikeSinkConfig;
+    private AerospikeClient client;
+    private WritePolicy writePolicy;
+    private BlockingQueue<AWriteListener> queue;
+    private NioEventLoops eventLoops;
+    private EventLoop eventLoop;
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception
{
+        aerospikeSinkConfig = AerospikeSinkConfig.load(config);
+        if (aerospikeSinkConfig.getSeedHosts() == null
+                || aerospikeSinkConfig.getKeyspace() == null
+                || aerospikeSinkConfig.getColumnName() == null) {
+            throw new IllegalArgumentException("Required property not set.");
+        }
+
+        writePolicy = new WritePolicy();
+        writePolicy.maxRetries = aerospikeSinkConfig.getRetries();
+        writePolicy.setTimeout(aerospikeSinkConfig.getTimeoutMs());
+        createClient();
+        queue = new LinkedBlockingDeque<>(aerospikeSinkConfig.getMaxConcurrentRequests());
+        for (int i = 0; i < aerospikeSinkConfig.getMaxConcurrentRequests(); ++i) {
+            queue.put(new AWriteListener(queue));
+        }
+
+        eventLoops = new NioEventLoops(new EventPolicy(), 1);
+        eventLoop = eventLoops.next();
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (client != null) {
+            client.close();
+        }
+
+        if (eventLoops != null) {
+            eventLoops.close();
+        }
+        LOG.info("Connection Closed");
+    }
+
+    @Override
+    public void write(Record<byte[]> record) {
+        KeyValue<K, V> keyValue = extractKeyValue(record);
+        Key key = new Key(aerospikeSinkConfig.getKeyspace(), aerospikeSinkConfig.getKeySet(),
keyValue.getKey().toString());
+        Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), Value.getAsBlob(keyValue.getValue()));
+        AWriteListener listener = null;
+        try {
+            listener = queue.take();
+        } catch (InterruptedException ex) {
+            record.fail();
+            return;
+        }
+        listener.setContext(record);
+        client.put(eventLoop, listener, writePolicy, key, bin);
+    }
+
+    private void createClient() {
+        String[] hosts = aerospikeSinkConfig.getSeedHosts().split(",");
+        if (hosts.length <= 0) {
+            throw new RuntimeException("Invalid Seed Hosts");
+        }
+        Host[] aeroSpikeHosts = new Host[hosts.length];
+        for (int i = 0; i < hosts.length; ++i) {
+            String[] hostPort = hosts[i].split(":");
+            aeroSpikeHosts[i] = new Host(hostPort[0], Integer.valueOf(hostPort[1]));
+        }
+        ClientPolicy policy = new ClientPolicy();
+        if (aerospikeSinkConfig.getUserName() != null && !aerospikeSinkConfig.getUserName().isEmpty()
+            && aerospikeSinkConfig.getPassword() != null && !aerospikeSinkConfig.getPassword().isEmpty())
{
+            policy.user = aerospikeSinkConfig.getUserName();
+            policy.password = aerospikeSinkConfig.getPassword();
+        }
+        client = new AerospikeClient(policy, aeroSpikeHosts);
+    }
+
+    private class AWriteListener implements WriteListener {
+        private Record<byte[]> context;
+        private BlockingQueue<AWriteListener> queue;
+
+        public AWriteListener(BlockingQueue<AWriteListener> queue) {
+            this.queue = queue;
+        }
+
+        public void setContext(Record<byte[]> record) {
+            this.context = record;
+        }
+
+        @Override
+        public void onSuccess(Key key) {
+            if (context != null) {
+                context.ack();
+            }
+            try {
+                queue.put(this);
+            } catch (InterruptedException ex) {
+                throw new RuntimeException("Interrupted while being added to the queue" ,ex);
+            }
+        }
+
+        @Override
+        public void onFailure(AerospikeException e) {
+            if (context != null) {
+                context.fail();
+            }
+            try {
+                queue.put(this);
+            } catch (InterruptedException ex) {
+                throw new RuntimeException("Interrupted while being added to the queue",
ex);
+            }
+        }
+    }
+
+    public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> message);
+}
\ No newline at end of file
diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSinkConfig.java
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSinkConfig.java
new file mode 100644
index 0000000..931d280
--- /dev/null
+++ b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSinkConfig.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.aerospike;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.*;
+import lombok.experimental.Accessors;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class AerospikeSinkConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private String seedHosts;
+    private String keyspace;
+    private String columnName;
+
+    // Optional
+    private String userName;
+    private String password;
+    private String keySet;
+    private int maxConcurrentRequests = 100;
+    private int timeoutMs = 100;
+    private int retries = 1;
+
+
+    public static AerospikeSinkConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), AerospikeSinkConfig.class);
+    }
+
+    public static AerospikeSinkConfig load(Map<String, Object> map) throws IOException
{
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), AerospikeSinkConfig.class);
+    }
+}
\ No newline at end of file
diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
new file mode 100644
index 0000000..bac07a0
--- /dev/null
+++ b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.aerospike;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+
+/**
+ * Aerospike sink that treats incoming messages on the input topic as Strings
+ * and write identical key/value pairs.
+ */
+public class AerospikeStringSink extends AerospikeAbstractSink<String, String> {
+    @Override
+    public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
+        String key = record.getKey().orElseGet(() -> new String(record.getValue()));
+        return new KeyValue<>(key, new String(record.getValue()));
+    }
+}
\ No newline at end of file
diff --git a/pulsar-io/aerospike/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/aerospike/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..f2a7ab5
--- /dev/null
+++ b/pulsar-io/aerospike/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: aerospike
+description: Aerospike database sink
+sinkClass: org.apache.pulsar.io.aerospike.AerospikeStringSink
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index e89cc02..92f2186 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -35,6 +35,7 @@
     <module>core</module>
     <module>twitter</module>
     <module>cassandra</module>
+    <module>aerospike</module>
     <module>kafka</module>
     <module>rabbitmq</module>
     <module>kinesis</module>
diff --git a/site2/docs/deploy-bare-metal.md b/site2/docs/deploy-bare-metal.md
index a65a45a..a438f92 100644
--- a/site2/docs/deploy-bare-metal.md
+++ b/site2/docs/deploy-bare-metal.md
@@ -125,6 +125,7 @@ $ tar xvfz apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz
 $ mv apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors
 
 $ ls connectors
+pulsar-io-aerospike-{{pulsar:version}}.nar
 pulsar-io-cassandra-{{pulsar:version}}.nar
 pulsar-io-kafka-{{pulsar:version}}.nar
 pulsar-io-kinesis-{{pulsar:version}}.nar
diff --git a/site2/docs/getting-started-standalone.md b/site2/docs/getting-started-standalone.md
index 944eee0..3f95cdd 100644
--- a/site2/docs/getting-started-standalone.md
+++ b/site2/docs/getting-started-standalone.md
@@ -87,6 +87,7 @@ $ tar xvfz /path/to/apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz
 $ cd apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors
 
 $ ls connectors
+pulsar-io-aerospike-{{pulsar:version}}.nar
 pulsar-io-cassandra-{{pulsar:version}}.nar
 pulsar-io-kafka-{{pulsar:version}}.nar
 pulsar-io-kinesis-{{pulsar:version}}.nar
diff --git a/site2/docs/io-aerospike.md b/site2/docs/io-aerospike.md
new file mode 100644
index 0000000..b23e2e3
--- /dev/null
+++ b/site2/docs/io-aerospike.md
@@ -0,0 +1,21 @@
+---
+id: io-aerospike
+title: Aerospike Sink Connector
+sidebar_label: Aerospike Sink Connector
+---
+
+The Aerospike Sink connector is used to write messages to an Aerospike Cluster.
+
+## Sink Configuration Options
+
+The following configuration options are specific to the Aerospike Connector:
+
+| Name | Required | Default | Description |
+|------|----------|---------|-------------|
+| `seedHosts` | `true` | `null` | Comma seperated list of one or more Aerospike cluster hosts;
each host can be specified as a valid IP address or hostname followed by an optional port
number (default is 3000). | 
+| `keyspace` | `true` | `null` | Aerospike namespace to use. |
+| `keySet` | `false` | `null` | Aerospike set name to use. |
+| `columnName` | `true` | `null` | Aerospike bin name to use. |
+| `maxConcurrentRequests` | `false` | `100` | Maximum number of concurrent Aerospike transactions
that a Sink can open. |
+| `timeoutMs` | `false` | `100` | A single timeout value controls `socketTimeout` and `totalTimeout`
for Aerospike transactions.  |
+| `retries` | `false` | `1` | Maximum number of retries before aborting a write transaction
to Aerospike. |
diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md
index 6f7d4b3..5a76998 100644
--- a/site2/docs/io-connectors.md
+++ b/site2/docs/io-connectors.md
@@ -9,6 +9,7 @@ These connectors import and export data from some of the most commonly used
data
 as easy as writing a simple connector configuration and running the connector locally or
submitting the connector to a
 Pulsar Functions cluster.
 
+- [Aerospike Sink Connector](io-aerospike.md)
 - [Cassandra Sink Connector](io-cassandra.md)
 - [Kafka Sink Connector](io-kafka.md#sink)
 - [Kafka Source Connector](io-kafka.md#source)
diff --git a/site2/docs/io-overview.md b/site2/docs/io-overview.md
index be8792c..0c55716 100644
--- a/site2/docs/io-overview.md
+++ b/site2/docs/io-overview.md
@@ -4,7 +4,7 @@ title: Pulsar IO Overview
 sidebar_label: Overview
 ---
 
-Messaging systems are most powerful when you can easily use them in conjunction with external
systems like databases and other messaging systems. **Pulsar IO** is a feature of Pulsar that
enables you to easily create, deploy, and manage Pulsar **connectors** that interact with
external systems, such as [Apache Cassandra](https://cassandra.apache.org), and many others.
+Messaging systems are most powerful when you can easily use them in conjunction with external
systems like databases and other messaging systems. **Pulsar IO** is a feature of Pulsar that
enables you to easily create, deploy, and manage Pulsar **connectors** that interact with
external systems, such as [Apache Cassandra](https://cassandra.apache.org), [Aerospike](https://www.aerospike.com),
and many others.
 
 > #### Pulsar IO and Pulsar Functions
 > Under the hood, Pulsar IO connectors are specialized [Pulsar Functions](functions-overview.md)
purpose-built to interface with external systems. The [administrative interface](io-quickstart.md)
for Pulsar IO is, in fact, quite similar to that of Pulsar Functions.
@@ -30,6 +30,7 @@ The following connectors are currently available for Pulsar:
 
 |Name|Java Class|Documentation|
 |---|---|---|
+|[Aerospike sink](https://www.aerospike.com/)|[`org.apache.pulsar.io.aerospike.AerospikeSink`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java)|[Documentation](io-aerospike.md)|
 |[Cassandra sink](https://cassandra.apache.org)|[`org.apache.pulsar.io.cassandra.CassandraSink`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java)|[Documentation](io-cassandra.md)|
 |[Kafka source](https://kafka.apache.org)|[`org.apache.pulsar.io.kafka.KafkaSource`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java)|[Documentation](io-kafka.md#source)|
 |[Kafka sink](https://kafka.apache.org)|[`org.apache.pulsar.io.kafka.KafkaSink`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java)|[Documentation](io-kafka.md#sink)|
diff --git a/site2/docs/io-quickstart.md b/site2/docs/io-quickstart.md
index 4b40f6b..8b8cfd3 100644
--- a/site2/docs/io-quickstart.md
+++ b/site2/docs/io-quickstart.md
@@ -69,6 +69,7 @@ $ tar xvfz /path/to/apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz
 $ cp -r apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors
 
 $ ls connectors
+pulsar-io-aerospike-{{pulsar:version}}.nar
 pulsar-io-cassandra-{{pulsar:version}}.nar
 pulsar-io-kafka-{{pulsar:version}}.nar
 pulsar-io-kinesis-{{pulsar:version}}.nar
@@ -122,7 +123,7 @@ curl -s http://localhost:8080/admin/v2/functions/connectors
 
 Example output:
 ```json
-[{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka
source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaStringSink"},{"name":"kinesis","description":"Kinesis
sink connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ
source connector","sour [...]
+[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes
data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka
source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaStringSink"},{"name":"kinesis","description":"Kinesis
sink connect [...]
 ```
 
 If an error occurred while starting Pulsar service, you may be able to seen exception at
the terminal you are running `pulsar/standalone`,


Mime
View raw message