pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: Add a Pulsar IO connector for RabbitMQ sink. (#3967)
Date Tue, 02 Apr 2019 21:44:04 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e8b3d3  Add a Pulsar IO connector for RabbitMQ sink. (#3967)
1e8b3d3 is described below

commit 1e8b3d3e74bf657e399ea49e4a43be1c54afe016
Author: Fangbin Sun <sunfangbin@gmail.com>
AuthorDate: Wed Apr 3 05:43:59 2019 +0800

    Add a Pulsar IO connector for RabbitMQ sink. (#3967)
    
    ### Motivation
    
    Provides a builtin RabbitMQ Sink Connector, in order to persist pulsar messages to a RabbitMQ
queue.
    
    ### Modifications
    
    Add a RabbitMQ Sink and some unit tests.
    
    ### Verifying this change
    
    This change can be verified as follows:
    
    * deploy the RabbitMQ sink connector with configuration file containing the following
fields:
    
    ```
    configs:
        host: "localhost"
        port: "5672"
        virtualHost: "/"
        username: "quest"
        password: "quest"
        queueName: "test_queue"
        connectionName: "test_connection"
        exchangeName: "test_exchange"
        routingKey: "test_routing"
    ```
    * deploy an RabbitMQ cluster and create the above elements
    * send messages in the topic with specified schema declared when deploying the connector
    * use `rabbitmqadmin` or `rabbitmqctl` to query messages in the specified queue
---
 pulsar-io/rabbitmq/pom.xml                         |  16 +++
 .../apache/pulsar/io/rabbitmq/RabbitMQSink.java    | 112 ++++++++++++++++
 .../pulsar/io/rabbitmq/RabbitMQSinkConfig.java     |  75 +++++++++++
 .../resources/META-INF/services/pulsar-io.yaml     |   3 +-
 .../io/rabbitmq/sink/RabbitMQSinkConfigTest.java   | 142 +++++++++++++++++++++
 .../pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java  | 101 +++++++++++++++
 .../resources/sinkConfig.yaml}                     |  20 ++-
 site2/docs/io-connectors.md                        |   1 +
 site2/docs/io-rabbitmq.md                          |  24 ++++
 9 files changed, 490 insertions(+), 4 deletions(-)

diff --git a/pulsar-io/rabbitmq/pom.xml b/pulsar-io/rabbitmq/pom.xml
index 5c75068..008ee06 100644
--- a/pulsar-io/rabbitmq/pom.xml
+++ b/pulsar-io/rabbitmq/pom.xml
@@ -37,6 +37,22 @@
       <artifactId>pulsar-io-core</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-functions-instance</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-slf4j-impl</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-original</artifactId>
+      <version>${project.version}</version>
+    </dependency>
 
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
new file mode 100644
index 0000000..43a980f
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * A Simple RabbitMQ sink, which transfer records from Pulsar to RabbitMQ.
+ * This class expects records from Pulsar to have values that are stored as bytes or string.
+ */
+@Connector(
+    name = "rabbitmq",
+    type = IOType.SINK,
+    help = "A sink connector is used for moving messages from Pulsar to RabbitMQ.",
+    configClass = RabbitMQSinkConfig.class
+)
+@Slf4j
+public class RabbitMQSink<T> implements Sink<T> {
+
+    private Connection rabbitMQConnection;
+    private Channel rabbitMQChannel;
+    private RabbitMQSinkConfig rabbitMQSinkConfig;
+    private String exchangeName;
+    private String routingKey;
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception
{
+        rabbitMQSinkConfig = RabbitMQSinkConfig.load(config);
+        rabbitMQSinkConfig.validate();
+
+        ConnectionFactory connectionFactory = rabbitMQSinkConfig.createConnectionFactory();
+        rabbitMQConnection = connectionFactory.newConnection(rabbitMQSinkConfig.getConnectionName());
+        log.info("A new connection to {}:{} has been opened successfully.",
+            rabbitMQConnection.getAddress().getCanonicalHostName(),
+            rabbitMQConnection.getPort()
+        );
+
+        exchangeName = rabbitMQSinkConfig.getExchangeName();
+        routingKey = rabbitMQSinkConfig.getRoutingKey();
+
+        rabbitMQChannel = rabbitMQConnection.createChannel();
+
+        // several clients share a queue
+        rabbitMQChannel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
+        rabbitMQChannel.queueDeclare(rabbitMQSinkConfig.getQueueName(), true, false, false,
null);
+        rabbitMQChannel.queueBind(rabbitMQSinkConfig.getQueueName(), exchangeName, routingKey);
+    }
+
+    @Override
+    public void write(Record<T> record) {
+        byte[] value = toBytes(record.getValue());
+        try {
+            rabbitMQChannel.basicPublish(exchangeName, routingKey, null, value);
+            record.ack();
+        } catch (IOException e) {
+            record.fail();
+            log.warn("Failed to publish the message to RabbitMQ ", e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (rabbitMQChannel != null) {
+            rabbitMQChannel.close();
+        }
+        if (rabbitMQConnection != null) {
+            rabbitMQConnection.close();
+        }
+    }
+
+    private byte[] toBytes(Object obj) {
+        final byte[] result;
+        if (obj instanceof String) {
+            String s = (String) obj;
+            result = s.getBytes(StandardCharsets.UTF_8);
+        } else if (obj instanceof byte[]) {
+            result = (byte[]) obj;
+        } else {
+            throw new IllegalArgumentException("The value of the record must be String or
Bytes.");
+        }
+        return result;
+    }
+}
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
new file mode 100644
index 0000000..4e1c5b1
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode(callSuper = false)
+@ToString
+@Accessors(chain = true)
+public class RabbitMQSinkConfig extends RabbitMQAbstractConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The exchange to publish the messages on")
+    private String exchangeName;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The routing key used for publishing the messages")
+    private String routingKey;
+
+    public static RabbitMQSinkConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), RabbitMQSinkConfig.class);
+    }
+
+    public static RabbitMQSinkConfig load(Map<String, Object> map) throws IOException
{
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), RabbitMQSinkConfig.class);
+    }
+
+    @Override
+    public void validate() {
+        super.validate();
+        Preconditions.checkNotNull(exchangeName, "exchangeName property not set.");
+        Preconditions.checkNotNull(routingKey, "routingKey property not set.");
+    }
+}
diff --git a/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml
index 8be5100..be65345 100644
--- a/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -18,5 +18,6 @@
 #
 
 name: rabbitmq
-description: RabbitMQ source connector
+description: RabbitMQ source and sink connector
 sourceClass: org.apache.pulsar.io.rabbitmq.RabbitMQSource
+sinkClass: org.apache.pulsar.io.rabbitmq.RabbitMQSink
\ No newline at end of file
diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java
new file mode 100644
index 0000000..1a4988b
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq.sink;
+
+import org.apache.pulsar.io.rabbitmq.RabbitMQSinkConfig;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+/**
+ * RabbitMQSinkConfig test
+ */
+public class RabbitMQSinkConfigTest {
+    @Test
+    public final void loadFromYamlFileTest() throws IOException {
+        File yamlFile = getFile("sinkConfig.yaml");
+        String path = yamlFile.getAbsolutePath();
+        RabbitMQSinkConfig config = RabbitMQSinkConfig.load(path);
+        assertNotNull(config);
+        assertEquals("localhost", config.getHost());
+        assertEquals(Integer.parseInt("5672"), config.getPort());
+        assertEquals("/", config.getVirtualHost());
+        assertEquals("guest", config.getUsername());
+        assertEquals("guest", config.getPassword());
+        assertEquals("test-queue", config.getQueueName());
+        assertEquals("test-connection", config.getConnectionName());
+        assertEquals(Integer.parseInt("0"), config.getRequestedChannelMax());
+        assertEquals(Integer.parseInt("0"), config.getRequestedFrameMax());
+        assertEquals(Integer.parseInt("60000"), config.getConnectionTimeout());
+        assertEquals(Integer.parseInt("10000"), config.getHandshakeTimeout());
+        assertEquals(Integer.parseInt("60"), config.getRequestedHeartbeat());
+        assertEquals("test-exchange", config.getExchangeName());
+        assertEquals("test-key", config.getRoutingKey());
+    }
+
+    @Test
+    public final void loadFromMapTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("host", "localhost");
+        map.put("port", "5672");
+        map.put("virtualHost", "/");
+        map.put("username", "guest");
+        map.put("password", "guest");
+        map.put("queueName", "test-queue");
+        map.put("connectionName", "test-connection");
+        map.put("requestedChannelMax", "0");
+        map.put("requestedFrameMax", "0");
+        map.put("connectionTimeout", "60000");
+        map.put("handshakeTimeout", "10000");
+        map.put("requestedHeartbeat", "60");
+        map.put("exchangeName", "test-exchange");
+        map.put("routingKey", "test-key");
+
+        RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map);
+        assertNotNull(config);
+        assertEquals("localhost", config.getHost());
+        assertEquals(Integer.parseInt("5672"), config.getPort());
+        assertEquals("/", config.getVirtualHost());
+        assertEquals("guest", config.getUsername());
+        assertEquals("guest", config.getPassword());
+        assertEquals("test-queue", config.getQueueName());
+        assertEquals("test-connection", config.getConnectionName());
+        assertEquals(Integer.parseInt("0"), config.getRequestedChannelMax());
+        assertEquals(Integer.parseInt("0"), config.getRequestedFrameMax());
+        assertEquals(Integer.parseInt("60000"), config.getConnectionTimeout());
+        assertEquals(Integer.parseInt("10000"), config.getHandshakeTimeout());
+        assertEquals(Integer.parseInt("60"), config.getRequestedHeartbeat());
+        assertEquals("test-exchange", config.getExchangeName());
+        assertEquals("test-key", config.getRoutingKey());
+    }
+
+    @Test
+    public final void validValidateTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("host", "localhost");
+        map.put("port", "5672");
+        map.put("virtualHost", "/");
+        map.put("username", "guest");
+        map.put("password", "guest");
+        map.put("queueName", "test-queue");
+        map.put("connectionName", "test-connection");
+        map.put("requestedChannelMax", "0");
+        map.put("requestedFrameMax", "0");
+        map.put("connectionTimeout", "60000");
+        map.put("handshakeTimeout", "10000");
+        map.put("requestedHeartbeat", "60");
+        map.put("exchangeName", "test-exchange");
+        map.put("routingKey", "test-key");
+
+        RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = NullPointerException.class,
+        expectedExceptionsMessageRegExp = "exchangeName property not set.")
+    public final void missingExchangeValidateTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("host", "localhost");
+        map.put("port", "5672");
+        map.put("virtualHost", "/");
+        map.put("username", "guest");
+        map.put("password", "guest");
+        map.put("queueName", "test-queue");
+        map.put("connectionName", "test-connection");
+        map.put("requestedChannelMax", "0");
+        map.put("requestedFrameMax", "0");
+        map.put("connectionTimeout", "60000");
+        map.put("handshakeTimeout", "10000");
+        map.put("requestedHeartbeat", "60");
+        map.put("routingKey", "test-key");
+
+        RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map);
+        config.validate();
+    }
+
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }
+}
diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
new file mode 100644
index 0000000..80675d1
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq.sink;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.instance.SinkRecord;
+import org.apache.pulsar.io.rabbitmq.RabbitMQBrokerManager;
+import org.apache.pulsar.io.rabbitmq.RabbitMQSink;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class RabbitMQSinkTest {
+    private RabbitMQBrokerManager rabbitMQBrokerManager;
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        rabbitMQBrokerManager = new RabbitMQBrokerManager();
+        rabbitMQBrokerManager.startBroker();
+    }
+
+    @AfterMethod
+    public void tearDown() throws Exception {
+        rabbitMQBrokerManager.stopBroker();
+    }
+
+    @Test
+    public void TestOpenAndWriteSink() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put("host", "localhost");
+        configs.put("port", "5672");
+        configs.put("virtualHost", "default");
+        configs.put("username", "guest");
+        configs.put("password", "guest");
+        configs.put("queueName", "test-queue");
+        configs.put("connectionName", "test-connection");
+        configs.put("requestedChannelMax", "0");
+        configs.put("requestedFrameMax", "0");
+        configs.put("connectionTimeout", "60000");
+        configs.put("handshakeTimeout", "10000");
+        configs.put("requestedHeartbeat", "60");
+        configs.put("exchangeName", "test-exchange");
+        configs.put("routingKey", "test-key");
+
+        RabbitMQSink sink = new RabbitMQSink();
+
+        // open should success
+        sink.open(configs, null);
+
+        // write should success
+        Record<String> record = build("test-topic", "fakeKey", "fakeValue");
+        sink.write(record);
+
+        sink.close();
+    }
+
+    private Record<String> build(String topic, String key, String value) {
+        // prepare a SinkRecord
+        SinkRecord<String> record = new SinkRecord<>(new Record<String>()
{
+            @Override
+            public Optional<String> getKey() {
+                return Optional.empty();
+            }
+
+            @Override
+            public String getValue() {
+                return key;
+            }
+
+            @Override
+            public Optional<String> getDestinationTopic() {
+                if (topic != null) {
+                    return Optional.of(topic);
+                } else {
+                    return Optional.empty();
+                }
+            }
+        }, value);
+        return record;
+    }
+}
diff --git a/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/rabbitmq/src/test/resources/sinkConfig.yaml
similarity index 68%
copy from pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml
copy to pulsar-io/rabbitmq/src/test/resources/sinkConfig.yaml
index 8be5100..a8c46b8 100644
--- a/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/rabbitmq/src/test/resources/sinkConfig.yaml
@@ -17,6 +17,20 @@
 # under the License.
 #
 
-name: rabbitmq
-description: RabbitMQ source connector
-sourceClass: org.apache.pulsar.io.rabbitmq.RabbitMQSource
+{
+"host": "localhost",
+"port": "5672",
+"virtualHost": "/",
+"username": "guest",
+"password": "guest",
+"queueName": "test-queue",
+"connectionName": "test-connection",
+"requestedChannelMax": "0",
+"requestedFrameMax": "0",
+"connectionTimeout": "60000",
+"handshakeTimeout": "10000",
+"requestedHeartbeat": "60",
+"exchangeName": "test-exchange",
+"routingKey": "test-key"
+
+}
diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md
index 6f94d3c..e36c5e4 100644
--- a/site2/docs/io-connectors.md
+++ b/site2/docs/io-connectors.md
@@ -15,6 +15,7 @@ Pulsar Functions cluster.
 - [Kafka Source Connector](io-kafka.md#source)
 - [Kinesis Sink Connector](io-kinesis.md#sink)
 - [RabbitMQ Source Connector](io-rabbitmq.md#source)
+- [RabbitMQ Sink Connector](io-rabbitmq.md#sink)
 - [Twitter Firehose Source Connector](io-twitter.md)
 - [CDC Source Connector based on Debezium](io-cdc.md)
 - [Netty Source Connector](io-netty.md#source)
diff --git a/site2/docs/io-rabbitmq.md b/site2/docs/io-rabbitmq.md
index 28bd141..5490a46 100644
--- a/site2/docs/io-rabbitmq.md
+++ b/site2/docs/io-rabbitmq.md
@@ -28,3 +28,27 @@ messages to Pulsar topics.
 | `prefetchCount` | `false` | `0` | Maximum number of messages that the server will deliver,
0 for unlimited. |
 | `prefetchGlobal` | `false` | `false` | Set true if the settings should be applied to the
entire channel rather than each consumer. |
 
+## Sink
+
+The RabbitMQ Sink connector is used to pull messages from Pulsar topics and persist the messages
+to a RabbitMQ queue.
+
+### Sink Configuration Options
+
+| Name | Required | Default | Description |
+|------|----------|---------|-------------|
+| `connectionName` | `true` | `null` | The connection name used for connecting to RabbitMQ.
|
+| `host` | `true` | `null` | The RabbitMQ host to connect to. |
+| `port` | `true` | `5672` | The RabbitMQ port to connect to. |
+| `virtualHost` | `true` | `/` | The virtual host used for connecting to RabbitMQ. |
+| `username` | `false` | `guest` | The username used to authenticate to RabbitMQ. |
+| `password` | `false` | `guest` | The password used to authenticate to RabbitMQ. |
+| `queueName` | `true` | `null` | The RabbitMQ queue name from which messages should be read
from or written to. |
+| `requestedChannelMax` | `false` | `0` | Initially requested maximum channel number. 0 for
unlimited. |
+| `requestedFrameMax` | `false` | `0` | Initially requested maximum frame size, in octets.
0 for unlimited. |
+| `connectionTimeout` | `false` | `60000` | Connection TCP establishment timeout in milliseconds.
0 for infinite. |
+| `handshakeTimeout` | `false` | `10000` | The AMQP0-9-1 protocol handshake timeout in milliseconds.
|
+| `requestedHeartbeat` | `false` | `60` | The requested heartbeat timeout in seconds. |
+| `exchangeName` | `true` | `null` | The exchange to publish the messages on. |
+| `routingKey` | `true` | `null` | The routing key used for publishing the messages. |
+


Mime
View raw message