pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] merlimat closed pull request #3269: Release notes and website update for 2.2.1
Date Mon, 31 Dec 2018 18:58:32 GMT
merlimat closed pull request #3269: Release notes and website update for 2.2.1
URL: https://github.com/apache/pulsar/pull/3269
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/site2/website/release-notes.md b/site2/website/release-notes.md
index 14ca292d5c..87e405d63a 100644
--- a/site2/website/release-notes.md
+++ b/site2/website/release-notes.md
@@ -1,30 +1,49 @@
 
-## Apache 
+## Apache
+
+### 2.2.1 &mdash; 2018-12-31 <a id="2.2.1"></a>
+
+This release includes fixes for 2.2.0 release. In particular:
+
+* Fixed issue when proxy HTTP admin API requests Pulsar proxy [#3022](https://github.com/apache/pulsar/pull/3022)
+
+* Fixed `Consumer.unsubscribe()` in Python client library [#3093](https://github.com/apache/pulsar/pull/3093)
+
+* Fixed ZLib decompression in C++ client [#2854](https://github.com/apache/pulsar/pull/2854)
+
+* Fixed Pulsar functions context publish in Python [#2844](https://github.com/apache/pulsar/pull/2844)
+
+For a complete list of issues fixed, see
+
+https://github.com/apache/pulsar/milestone/19?closed=1
+
+https://github.com/apache/incubator-pulsar/releases/tag/v2.2.1
+
 
 ### 2.2.0 &mdash; 2018-10-24 <a id="2.2.0"></a>
 
-This is the first release of Pulsar as an Apache Top Level Project 
+This is the first release of Pulsar as an Apache Top Level Project
 
 This is a feature release, including several new features, improvements and fixes for  issues reported for 2.1.1-incubating.
  
 * [Pulsar Java Client Interceptors](https://github.com/apache/pulsar/pull/2471)
 
-* [Integration of functions and io with schema registry](https://github.com/apache/pulsar/pull/2266) 
+* [Integration of functions and io with schema registry](https://github.com/apache/pulsar/pull/2266)
 
-* [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) 
+* [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic)
 
-* [Flink Source connector](https://github.com/apache/pulsar/pull/2555) 
+* [Flink Source connector](https://github.com/apache/pulsar/pull/2555)
 
-* [JDBC Sink Connector](https://github.com/apache/pulsar/issues/2313) 
+* [JDBC Sink Connector](https://github.com/apache/pulsar/issues/2313)
 
-* [HDFS Sink Connector](https://github.com/apache/pulsar/pull/2409) 
+* [HDFS Sink Connector](https://github.com/apache/pulsar/pull/2409)
 
-* [Google Cloud Storage Offloader](https://github.com/apache/pulsar/issues/2067) 
+* [Google Cloud Storage Offloader](https://github.com/apache/pulsar/issues/2067)
 
-* [Pulsar SQL](https://github.com/apache/pulsar/wiki/PIP-19:-Pulsar-SQL) 
+* [Pulsar SQL](https://github.com/apache/pulsar/wiki/PIP-19:-Pulsar-SQL)
 
 
-For a complete list of issues fixed, see 
+For a complete list of issues fixed, see
 
 https://github.com/apache/pulsar/milestone/16?closed=1
 
@@ -68,7 +87,7 @@ It is a feature release, including several new features and major improvements:
   - [Kafka Connector](/docs/en/io-kafka/)
   - [Kinesis Connector](/docs/en/io-kinesis/)
   - [RabbitMQ Connector](/docs/en/io-rabbitmq/)
-  - [Twitter Firehose Connector](/docs/en/io-twitter/) 
+  - [Twitter Firehose Connector](/docs/en/io-twitter/)
 - [Tiered Storage](/docs/en/concepts-tiered-storage/): An extension in Pulsar segment store to offload older segments into long term storage (e.g. HDFS, S3).
   S3 support is supported in 2.1 release.
 - [Stateful function](/docs/en/functions-state/): Pulsar Functions is able to use [State API](/docs/en/functions-state#api) for storing state within Pulsar.
diff --git a/site2/website/releases.json b/site2/website/releases.json
index 8286738e05..cf529dbd36 100644
--- a/site2/website/releases.json
+++ b/site2/website/releases.json
@@ -1,4 +1,5 @@
 [
+  "2.2.1",
   "2.2.0",
   "2.1.1-incubating",
   "2.1.0-incubating",
diff --git a/site2/website/versioned_docs/version-2.2.1/admin-api-persistent-topics.md b/site2/website/versioned_docs/version-2.2.1/admin-api-persistent-topics.md
new file mode 100644
index 0000000000..4f7307416d
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/admin-api-persistent-topics.md
@@ -0,0 +1,623 @@
+---
+id: version-2.2.1-admin-api-persistent-topics
+title: Managing persistent topics
+sidebar_label: Persistent topics
+original_id: admin-api-persistent-topics
+---
+
+Persistent helps to access topic which is a logical endpoint for publishing and consuming messages. Producers publish messages to the topic and consumers subscribe to the topic, to consume messages published to the topic.
+
+In all of the instructions and commands below, the topic name structure is:
+
+
+```shell
+persistent://tenant/namespace/topic
+```
+
+## Persistent topics resources
+
+### List of topics
+
+It provides a list of persistent topics exist under a given namespace.
+
+#### pulsar-admin
+
+List of topics can be fetched using [`list`](../../reference/CliTools#list) command.
+
+```shell
+$ pulsar-admin persistent list \
+  my-tenant/my-namespace
+```
+
+#### REST API
+
+{@inject: endpoint|GET|/admin/v2/persistent/:tenant/:namespace|operation/getList}
+
+#### Java
+
+```java
+String namespace = "my-tenant/my-namespace";
+admin.persistentTopics().getList(namespace);
+```
+
+### Grant permission
+
+It grants permissions on a client role to perform specific actions on a given topic.
+
+#### pulsar-admin
+
+Permission can be granted using [`grant-permission`](../../reference/CliTools#grant-permission) command.
+
+```shell
+$ pulsar-admin persistent grant-permission \
+  --actions produce,consume --role application1 \
+  persistent://test-tenant/ns1/tp1 \
+
+```
+
+#### REST API
+
+{@inject: endpoint|POST|/admin/v2/persistent/:tenant/:namespace/:topic/permissions/:role|operation/grantPermissionsOnTopic}
+
+#### Java
+
+```java
+String topic = "persistent://my-tenant/my-namespace/my-topic";
+String role = "test-role";
+Set<AuthAction> actions  = Sets.newHashSet(AuthAction.produce, AuthAction.consume);
+admin.persistentTopics().grantPermission(topic, role, actions);
+```
+
+### Get permission
+
+Permission can be fetched using [`permissions`](../../reference/CliTools#permissions) command.
+
+#### pulsar-admin
+
+```shell
+$ pulsar-admin persistent permissions \
+  persistent://test-tenant/ns1/tp1 \
+
+{
+    "application1": [
+        "consume",
+        "produce"
+    ]
+}
+```
+
+#### REST API
+
+{@inject: endpoint|GET|/admin/v2/persistent/:tenant/:namespace/:topic/permissions|operation/getPermissionsOnTopic}
+
+#### Java
+
+```java
+String topic = "persistent://my-tenant/my-namespace/my-topic";
+admin.persistentTopics().getPermissions(topic);
+```
+
+### Revoke permission
+
+It revokes a permission which was granted on a client role.
+
+#### pulsar-admin
+
+Permission can be revoked using [`revoke-permission`](../../reference/CliTools#revoke-permission) command.
+
+```shell
+$ pulsar-admin persistent revoke-permission \
+  --role application1 \
+  persistent://test-tenant/ns1/tp1 \
+
+{
+  "application1": [
+    "consume",
+    "produce"
+  ]
+}
+```
+
+#### REST API
+
+{@inject: endpoint|DELETE|/admin/v2/persistent/:tenant/:namespace/:topic/permissions/:role|operation/revokePermissionsOnTopic}
+
+#### Java
+
+```java
+String topic = "persistent://my-tenant/my-namespace/my-topic";
+String role = "test-role";
+admin.persistentTopics().revokePermissions(topic, role);
+```
+
+### Delete topic
+
+It deletes a topic. The topic cannot be deleted if there's any active subscription or producers connected to it.
+
+#### pulsar-admin
+
+Topic can be deleted using [`delete`](../../reference/CliTools#delete) command.
+
+```shell
+$ pulsar-admin persistent delete \
+  persistent://test-tenant/ns1/tp1 \
+```
+
+#### REST API
+
+{@inject: endpoint|DELETE|/admin/v2/persistent/:tenant/:namespace/:topic|operation/deleteTopic}
+
+#### Java
+
+```java
+String topic = "persistent://my-tenant/my-namespace/my-topic";
+admin.persistentTopics().delete(topic);
+```
+
+### Unload topic
+
+It unloads a topic.
+
+#### pulsar-admin
+
+Topic can be unloaded using [`unload`](../../reference/CliTools#unload) command.
+
+```shell
+$ pulsar-admin persistent unload \
+  persistent://test-tenant/ns1/tp1 \
+```
+
+#### REST API
+
+{@inject: endpoint|PUT|/admin/v2/persistent/:tenant/:namespace/:topic/unload|operation/unloadTopic}
+
+#### Java
+
+```java
+String topic = "persistent://my-tenant/my-namespace/my-topic";
+admin.persistentTopics().unload(topic);
+```
+
+### Get stats
+
+It shows current statistics of a given non-partitioned topic.
+
+  -   **msgRateIn**: The sum of all local and replication publishers' publish rates in messages per second
+
+  -   **msgThroughputIn**: Same as above, but in bytes per second instead of messages per second
+
+  -   **msgRateOut**: The sum of all local and replication consumers' dispatch rates in messages per second
+
+  -   **msgThroughputOut**: Same as above, but in bytes per second instead of messages per second
+
+  -   **averageMsgSize**: The average size in bytes of messages published within the last interval
+
+  -   **storageSize**: The sum of the ledgers' storage size for this topic. See
+
+  -   **publishers**: The list of all local publishers into the topic. There can be zero or thousands
+
+  -   **averageMsgSize**: Average message size in bytes from this publisher within the last interval
+
+  -   **producerId**: Internal identifier for this producer on this topic
+
+  -   **producerName**: Internal identifier for this producer, generated by the client library
+
+  -   **address**: IP address and source port for the connection of this producer
+
+  -   **connectedSince**: Timestamp this producer was created or last reconnected
+
+  -   **subscriptions**: The list of all local subscriptions to the topic
+
+  -   **my-subscription**: The name of this subscription (client defined)
+
+  -   **msgBacklog**: The count of messages in backlog for this subscription
+
+  -   **type**: This subscription type
+
+  -   **msgRateExpired**: The rate at which messages were discarded instead of dispatched from this subscription due to TTL
+
+  -   **consumers**: The list of connected consumers for this subscription
+
+  -   **consumerName**: Internal identifier for this consumer, generated by the client library
+
+  -   **availablePermits**: The number of messages this consumer has space for in the client library's listen queue. A value of 0 means the client library's queue is full and receive() isn't being called. A nonzero value means this consumer is ready to be dispatched messages.
+
+  -   **replication**: This section gives the stats for cross-colo replication of this topic
+
+  -   **replicationBacklog**: The outbound replication backlog in messages
+
+  -   **connected**: Whether the outbound replicator is connected
+
+  -   **replicationDelayInSeconds**: How long the oldest message has been waiting to be sent through the connection, if connected is true
+
+  -   **inboundConnection**: The IP and port of the broker in the remote cluster's publisher connection to this broker
+
+  -   **inboundConnectedSince**: The TCP connection being used to publish messages to the remote cluster. If there are no local publishers connected, this connection is automatically closed after a minute.
+
+```json
+{
+  "msgRateIn": 4641.528542257553,
+  "msgThroughputIn": 44663039.74947473,
+  "msgRateOut": 0,
+  "msgThroughputOut": 0,
+  "averageMsgSize": 1232439.816728665,
+  "storageSize": 135532389160,
+  "publishers": [
+    {
+      "msgRateIn": 57.855383881403576,
+      "msgThroughputIn": 558994.7078932219,
+      "averageMsgSize": 613135,
+      "producerId": 0,
+      "producerName": null,
+      "address": null,
+      "connectedSince": null
+    }
+  ],
+  "subscriptions": {
+    "my-topic_subscription": {
+      "msgRateOut": 0,
+      "msgThroughputOut": 0,
+      "msgBacklog": 116632,
+      "type": null,
+      "msgRateExpired": 36.98245516804671,
+      "consumers": []
+    }
+  },
+  "replication": {}
+}
+```
+
+#### pulsar-admin
+
+Topic stats can be fetched using [`stats`](../../reference/CliTools#stats) command.
+
+```shell
+$ pulsar-admin persistent stats \
+  persistent://test-tenant/ns1/tp1 \
+```
+
+#### REST API
+
+{@inject: endpoint|GET|/admin/v2/persistent/:tenant/:namespace/:topic/stats|operation/getStats}
+
+#### Java
+
+```java
+String topic = "persistent://my-tenant/my-namespace/my-topic";
+admin.persistentTopics().getStats(topic);
+```
+
+### Get internal stats
+
+It shows detailed statistics of a topic.
+
+  -   **entriesAddedCounter**: Messages published since this broker loaded this topic
+
+  -   **numberOfEntries**: Total number of messages being tracked
+
+  -   **totalSize**: Total storage size in bytes of all messages
+
+  -   **currentLedgerEntries**: Count of messages written to the ledger currently open for writing
+
+  -   **currentLedgerSize**: Size in bytes of messages written to ledger currently open for writing
+
+  -   **lastLedgerCreatedTimestamp**: time when last ledger was created
+
+  -   **lastLedgerCreationFailureTimestamp:** time when last ledger was failed
+
+  -   **waitingCursorsCount**: How many cursors are "caught up" and waiting for a new message to be published
+
+  -   **pendingAddEntriesCount**: How many messages have (asynchronous) write requests we are waiting on completion
+
+  -   **lastConfirmedEntry**: The ledgerid:entryid of the last message successfully written. If the entryid is -1, then the ledger has been opened or is currently being opened but has no entries written yet.
+
+  -   **state**: The state of this ledger for writing. LedgerOpened means we have a ledger open for saving published messages.
+
+  -   **ledgers**: The ordered list of all ledgers for this topic holding its messages
+
+  -   **cursors**: The list of all cursors on this topic. There will be one for every subscription you saw in the topic stats.
+
+  -   **markDeletePosition**: The ack position: the last message the subscriber acknowledged receiving
+
+  -   **readPosition**: The latest position of subscriber for reading message
+
+  -   **waitingReadOp**: This is true when the subscription has read the latest message published to the topic and is waiting on new messages to be published.
+
+  -   **pendingReadOps**: The counter for how many outstanding read requests to the BookKeepers we have in progress
+
+  -   **messagesConsumedCounter**: Number of messages this cursor has acked since this broker loaded this topic
+
+  -   **cursorLedger**: The ledger being used to persistently store the current markDeletePosition
+
+  -   **cursorLedgerLastEntry**: The last entryid used to persistently store the current markDeletePosition
+
+  -   **individuallyDeletedMessages**: If Acks are being done out of order, shows the ranges of messages Acked between the markDeletePosition and the read-position
+
+  -   **lastLedgerSwitchTimestamp**: The last time the cursor ledger was rolled over
+
+  -   **state**: The state of the cursor ledger: Open means we have a cursor ledger for saving updates of the markDeletePosition.
+
+```json
+{
+    "entriesAddedCounter": 20449518,
+    "numberOfEntries": 3233,
+    "totalSize": 331482,
+    "currentLedgerEntries": 3233,
+    "currentLedgerSize": 331482,
+    "lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
+    "lastLedgerCreationFailureTimestamp": null,
+    "waitingCursorsCount": 1,
+    "pendingAddEntriesCount": 0,
+    "lastConfirmedEntry": "324711539:3232",
+    "state": "LedgerOpened",
+    "ledgers": [
+        {
+            "ledgerId": 324711539,
+            "entries": 0,
+            "size": 0
+        }
+    ],
+    "cursors": {
+        "my-subscription": {
+            "markDeletePosition": "324711539:3133",
+            "readPosition": "324711539:3233",
+            "waitingReadOp": true,
+            "pendingReadOps": 0,
+            "messagesConsumedCounter": 20449501,
+            "cursorLedger": 324702104,
+            "cursorLedgerLastEntry": 21,
+            "individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
+            "lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
+            "state": "Open"
+        }
+    }
+}
+```
+
+
+#### pulsar-admin
+
+Topic internal-stats can be fetched using [`stats-internal`](../../reference/CliTools#stats-internal) command.
+
+```shell
+$ pulsar-admin persistent stats-internal \
+  persistent://test-tenant/ns1/tp1 \
+```
+
+#### REST API
+
+{@inject: endpoint|GET|/admin/v2/persistent/:tenant/:namespace/:topic/internalStats|operation/getInternalStats}
+
+#### Java
+
+```java
+String topic = "persistent://my-tenant/my-namespace/my-topic";
+admin.persistentTopics().getInternalStats(topic);
+```
+
+### Peek messages
+
+It peeks N messages for a specific subscription of a given topic.
+
+#### pulsar-admin
+
+
+```shell
+$ pulsar-admin persistent peek-messages \
+  --count 10 --subscription my-subscription \
+  persistent://test-tenant/ns1/tp1 \
+
+Message ID: 315674752:0
+Properties:  {  "X-Pulsar-publish-time" : "2015-07-13 17:40:28.451"  }
+msg-payload
+```
+
+#### REST API
+
+{@inject: endpoint|GET|/admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/position/:messagePosition|operation/peekNthMessage}
+
+#### Java
+
+```java
+String topic = "persistent://my-tenant/my-namespace/my-topic";
+String subName = "my-subscription";
+int numMessages = 1;
+admin.persistentTopics().peekMessages(topic, subName, numMessages);
+```
+
+### Skip messages
+
+It skips N messages for a specific subscription of a given topic.
+
+#### pulsar-admin
+
+
+```shell
+$ pulsar-admin persistent skip \
+  --count 10 --subscription my-subscription \
+  persistent://test-tenant/ns1/tp1 \
+```
+
+#### REST API
+
+{@inject: endpoint|POST|/admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/skip/:numMessages|operation/skipMessages}
+
+#### Java
+
+```java
+String topic = "persistent://my-tenant/my-namespace/my-topic";
+String subName = "my-subscription";
+int numMessages = 1;
+admin.persistentTopics().skipMessages(topic, subName, numMessages);
+```
+
+### Skip all messages
+
+It skips all old messages for a specific subscription of a given topic.
+
+#### pulsar-admin
+
+
+```shell
+$ pulsar-admin persistent skip-all \
+  --subscription my-subscription \
+  persistent://test-tenant/ns1/tp1 \
+```
+
+#### REST API
+
+{@inject: endpoint|POST|/admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/skip_all|operation/skipAllMessages}
+
+[More info](../../reference/RestApi#/admin/persistent/:tenant/:namespace/:topic/subscription/:subName/skip_all)
+
+#### Java
+
+```java
+String topic = "persistent://my-tenant/my-namespace/my-topic";
+String subName = "my-subscription";
+admin.persistentTopics().skipAllMessages(topic, subName);
+```
+
+### Reset cursor
+
+It resets a subscription’s cursor position back to the position which was recorded X minutes before. It essentially calculates time and position of cursor at X minutes before and resets it at that position.
+
+#### pulsar-admin
+
+
+```shell
+$ pulsar-admin persistent reset-cursor \
+  --subscription my-subscription --time 10 \
+  persistent://test-tenant/ns1/tp1 \
+```
+
+#### REST API
+
+{@inject: endpoint|POST|/admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/resetcursor/:timestamp|operation/resetCursor}
+
+#### Java
+
+```java
+String topic = "persistent://my-tenant/my-namespace/my-topic";
+String subName = "my-subscription";
+long timestamp = 2342343L;
+admin.persistentTopics().skipAllMessages(topic, subName, timestamp);
+```
+
+### Lookup of topic
+
+It locates broker url which is serving the given topic.
+
+#### pulsar-admin
+
+
+```shell
+$ pulsar-admin persistent lookup \
+  persistent://test-tenant/ns1/tp1 \
+
+ "pulsar://broker1.org.com:4480"
+```
+
+#### REST API
+
+{@inject: endpoint|GET|/lookup/v2/topic/persistent/:tenant:namespace/:topic|/}
+
+#### Java
+
+```java
+String topic = "persistent://my-tenant/my-namespace/my-topic";
+admin.lookup().lookupDestination(topic);
+```
+
+### Get bundle
+
+It gives range of the bundle which contains given topic
+
+#### pulsar-admin
+
+
+```shell
+$ pulsar-admin persistent bundle-range \
+  persistent://test-tenant/ns1/tp1 \
+
+ "0x00000000_0xffffffff"
+```
+
+#### REST API
+
+{@inject: endpoint|GET|/lookup/v2/topic/:topic_domain/:tenant/:namespace/:topic/bundle|/}
+
+#### Java
+
+```java
+String topic = "persistent://my-tenant/my-namespace/my-topic";
+admin.lookup().getBundleRange(topic);
+```
+
+
+### Get subscriptions
+
+It shows all subscription names for a given topic.
+
+#### pulsar-admin
+
+```shell
+$ pulsar-admin persistent subscriptions \
+  persistent://test-tenant/ns1/tp1 \
+
+ my-subscription
+```
+
+#### REST API
+
+{@inject: endpoint|GET|/admin/v2/persistent/:tenant/:namespace/:topic/subscriptions|operation/getSubscriptions}
+
+#### Java
+
+```java
+String topic = "persistent://my-tenant/my-namespace/my-topic";
+admin.persistentTopics().getSubscriptions(topic);
+```
+
+### Unsubscribe
+
+It can also help to unsubscribe a subscription which is no more processing further messages.
+
+#### pulsar-admin
+
+
+```shell
+$ pulsar-admin persistent unsubscribe \
+  --subscription my-subscription \
+  persistent://test-tenant/ns1/tp1 \
+```
+
+#### REST API
+
+{@inject: endpoint|DELETE|/admin/v2/namespaces/:tenant/:namespace/:topic/subscription/:subscription|operation/deleteSubscription}
+
+#### Java
+
+```java
+String topic = "persistent://my-tenant/my-namespace/my-topic";
+String subscriptionName = "my-subscription";
+admin.persistentTopics().deleteSubscription(topic, subscriptionName);
+```
+
+### Last Message Id
+
+It gives the last commited message ID for a persistent topic, and it will be available in 2.3.0.
+
+```shell
+pulsar-admin topics last-message-id topic-name
+```
+
+#### REST API
+{% endpoint Get /admin/v2/persistent/:tenant/:namespace/:topic/lastMessageId %}
+
+#### Java
+
+```Java
+String topic = "persistent://my-tenant/my-namespace/my-topic";
+admin.persistentTopics().getLastMessage(topic);
+```
\ No newline at end of file
diff --git a/site2/website/versioned_docs/version-2.2.1/client-libraries-go.md b/site2/website/versioned_docs/version-2.2.1/client-libraries-go.md
new file mode 100644
index 0000000000..abc078bb64
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/client-libraries-go.md
@@ -0,0 +1,463 @@
+---
+id: version-2.2.1-client-libraries-go
+title: The Pulsar Go client
+sidebar_label: Go
+original_id: client-libraries-go
+---
+
+The Pulsar Go client can be used to create Pulsar [producers](#producers), [consumers](#consumers), and [readers](#readers) in Go (aka Golang).
+
+> #### API docs available as well
+> For standard API docs, consult the [Godoc](https://godoc.org/github.com/apache/pulsar/pulsar-client-go/pulsar).
+
+
+## Installation
+
+### Requirements
+
+Pulsar Go client library is based on the C++ client library. Follow
+the instructions for [C++ library](client-libraries-cpp.md) for installing the binaries
+through [RPM](client-libraries-cpp.md#rpm), [Deb](client-libraries-cpp.md#deb) or [Homebrew packages](client-libraries-cpp.md#macos).
+
+### Installing go package
+
+> #### Compatibility Warning
+> The version number of the Go client **must match** the version number of the Pulsar C++ client library.
+
+You can install the `pulsar` library locally using `go get`.  Note that `go get` doesn't support fetching a specific tag - it will always pull in master's version of the Go client.  You'll need a C++ client library that matches master.
+
+```bash
+$ go get -u github.com/apache/pulsar/pulsar-client-go/pulsar
+```
+
+Or you can use [dep](https://github.com/golang/dep) for managing the dependencies.
+
+```bash
+$ dep ensure -add github.com/apache/pulsar/pulsar-client-go/pulsar@v{{pulsar:version}}
+```
+
+Once installed locally, you can import it into your project:
+
+```go
+import "github.com/apache/pulsar/pulsar-client-go/pulsar"
+```
+
+## Connection URLs
+
+To connect to Pulsar using client libraries, you need to specify a [Pulsar protocol](developing-binary-protocol.md) URL.
+
+Pulsar protocol URLs are assigned to specific clusters, use the `pulsar` scheme and have a default port of 6650. Here's an example for `localhost`:
+
+```http
+pulsar://localhost:6650
+```
+
+A URL for a production Pulsar cluster may look something like this:
+
+```http
+pulsar://pulsar.us-west.example.com:6650
+```
+
+If you're using [TLS](security-tls-authentication.md) authentication, the URL will look like something like this:
+
+```http
+pulsar+ssl://pulsar.us-west.example.com:6651
+```
+
+## Creating a client
+
+In order to interact with Pulsar, you'll first need a `Client` object. You can create a client object using the `NewClient` function, passing in a `ClientOptions` object (more on configuration [below](#client-configuration)). Here's an example:
+
+
+```go
+import (
+    "log"
+    "runtime"
+
+    "github.com/apache/pulsar/pulsar-client-go/pulsar"
+)
+
+func main() {
+    client, err := pulsar.NewClient(pulsar.ClientOptions{
+        URL: "pulsar://localhost:6650",
+        OperationTimeoutSeconds: 5,
+        MessageListenerThreads: runtime.NumCPU(),
+    })
+
+    if err != nil {
+        log.Fatalf("Could not instantiate Pulsar client: %v", err)
+    }
+}
+```
+
+The following configurable parameters are available for Pulsar clients:
+
+Parameter | Description | Default
+:---------|:------------|:-------
+`URL` | The connection URL for the Pulsar cluster. See [above](#urls) for more info |
+`IOThreads` | The number of threads to use for handling connections to Pulsar [brokers](reference-terminology.md#broker) | 1
+`OperationTimeoutSeconds` | The timeout for some Go client operations (creating producers, subscribing to and unsubscribing from [topics](reference-terminology.md#topic)). Retries will occur until this threshold is reached, at which point the operation will fail. | 30
+`MessageListenerThreads` | The number of threads used by message listeners ([consumers](#consumers) and [readers](#readers)) | 1
+`ConcurrentLookupRequests` | The number of concurrent lookup requests that can be sent on each broker connection. Setting a maximum helps to keep from overloading brokers. You should set values over the default of 5000 only if the client needs to produce and/or subscribe to thousands of Pulsar topics. | 5000
+`Logger` | A custom logger implementation for the client (as a function that takes a log level, file path, line number, and message). All info, warn, and error messages will be routed to this function. | `nil`
+`TLSTrustCertsFilePath` | The file path for the trusted TLS certificate |
+`TLSAllowInsecureConnection` | Whether the client accepts untrusted TLS certificates from the broker | `false`
+`Authentication` | Configure the authentication provider. (default: no authentication). Example: `Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem")` | `nil`
+`StatsIntervalInSeconds` | The interval (in seconds) at which client stats are published | 60
+
+## Producers
+
+Pulsar producers publish messages to Pulsar topics. You can [configure](#producer-configuration) Go producers using a `ProducerOptions` object. Here's an example:
+
+```go
+producer, err := client.CreateProducer(pulsar.ProducerOptions{
+    Topic: "my-topic",
+})
+
+if err != nil {
+    log.Fatalf("Could not instantiate Pulsar producer: %v", err)
+}
+
+defer producer.Close()
+
+msg := pulsar.ProducerMessage{
+    Payload: []byte("Hello, Pulsar"),
+}
+
+if err := producer.Send(msg); err != nil {
+    log.Fatalf("Producer could not send message: %v", err)
+}
+```
+
+> #### Blocking operation
+> When you create a new Pulsar producer, the operation will block (waiting on a go channel) until either a producer is successfully created or an error is thrown.
+
+
+### Producer operations
+
+Pulsar Go producers have the following methods available:
+
+Method | Description | Return type
+:------|:------------|:-----------
+`Topic()` | Fetches the producer's [topic](reference-terminology.md#topic)| `string`
+`Name()` | Fetchs the producer's name | `string`
+`Send(context.Context, ProducerMessage) error` | Publishes a [message](#messages) to the producer's topic. This call will block until the message is successfully acknowledged by the Pulsar broker, or an error will be thrown if the timeout set using the `SendTimeout` in the producer's [configuration](#producer-configuration) is exceeded. | `error`
+`SendAsync(context.Context, ProducerMessage, func(ProducerMessage, error))` | Publishes a [message](#messages) to the producer's topic asynchronously. The third argument is a callback function that specifies what happens either when the message is acknowledged or an error is thrown. |
+`Close()` | Closes the producer and releases all resources allocated to it. If `Close()` is called then no more messages will be accepted from the publisher. This method will block until all pending publish requests have been persisted by Pulsar. If an error is thrown, no pending writes will be retried. | `error`
+
+Here's a more involved example usage of a producer:
+
+```go
+import (
+    "context"
+    "fmt"
+    "log"
+
+    "github.com/apache/pulsar/pulsar-client-go/pulsar"
+)
+
+func main() {
+    // Instantiate a Pulsar client
+    client, err := pulsar.NewClient(pulsar.ClientOptions{
+        URL: "pulsar://localhost:6650",
+    })
+
+    if err != nil { log.Fatal(err) }
+
+    // Use the client to instantiate a producer
+    producer, err := client.CreateProducer(pulsar.ProducerOptions{
+        Topic: "my-topic",
+    })
+
+    if err != nil { log.Fatal(err) }
+
+    ctx := context.Background()
+
+    // Send 10 messages synchronously and 10 messages asynchronously
+    for i := 0; i < 10; i++ {
+        // Create a message
+        msg := pulsar.ProducerMessage{
+            Payload: []byte(fmt.Sprintf("message-%d", i)),
+        }
+
+        // Attempt to send the message
+        if err := producer.Send(ctx, msg); err != nil {
+            log.Fatal(err)
+        }
+
+        // Create a different message to send asynchronously
+        asyncMsg := pulsar.ProducerMessage{
+            Payload: []byte(fmt.Sprintf("async-message-%d", i)),
+        }
+
+        // Attempt to send the message asynchronously and handle the response
+        producer.SendAsync(ctx, asyncMsg, func(msg pulsar.ProducerMessage, err error) {
+            if err != nil { log.Fatal(err) }
+
+            fmt.Printf("Message %s succesfully published", msg.ID())
+        })
+    }
+}
+```
+
+### Producer configuration
+
+Parameter | Description | Default
+:---------|:------------|:-------
+`Topic` | The Pulsar [topic](reference-terminology.md#topic) to which the producer will publish messages |
+`Name` | A name for the producer. If you don't explicitly assign a name, Pulsar will automatically generate a globally unique name that you can access later using the `Name()` method.  If you choose to explicitly assign a name, it will need to be unique across *all* Pulsar clusters, otherwise the creation operation will throw an error. |
+`SendTimeout` | When publishing a message to a topic, the producer will wait for an acknowledgment from the responsible Pulsar [broker](reference-terminology.md#broker). If a message is not acknowledged within the threshold set by this parameter, an error will be thrown. If you set `SendTimeout` to -1, the timeout will be set to infinity (and thus removed). Removing the send timeout is recommended when using Pulsar's [message de-duplication](cookbooks-deduplication.md) feature. | 30 seconds
+`MaxPendingMessages` | The maximum size of the queue holding pending messages (i.e. messages waiting to receive an acknowledgment from the [broker](reference-terminology.md#broker)). By default, when the queue is full all calls to the `Send` and `SendAsync` methods will fail *unless* `BlockIfQueueFull` is set to `true`. |
+`MaxPendingMessagesAcrossPartitions` | |
+`BlockIfQueueFull` | If set to `true`, the producer's `Send` and `SendAsync` methods will block when the outgoing message queue is full rather than failing and throwing an error (the size of that queue is dictated by the `MaxPendingMessages` parameter); if set to `false` (the default), `Send` and `SendAsync` operations will fail and throw a `ProducerQueueIsFullError` when the queue is full. | `false`
+`MessageRoutingMode` | The message routing logic (for producers on [partitioned topics](concepts-architecture-overview.md#partitioned-topics)). This logic is applied only when no key is set on messages. The available options are: round robin (`pulsar.RoundRobinDistribution`, the default), publishing all messages to a single partition (`pulsar.UseSinglePartition`), or a custom partitioning scheme (`pulsar.CustomPartition`). | `pulsar.RoundRobinDistribution`
+`HashingScheme` | The hashing function that determines the partition on which a particular message is published (partitioned topics only). The available options are: `pulsar.JavaStringHash` (the equivalent of `String.hashCode()` in Java), `pulsar.Murmur3_32Hash` (applies the [Murmur3](https://en.wikipedia.org/wiki/MurmurHash) hashing function), or `pulsar.BoostHash` (applies the hashing function from C++'s [Boost](https://www.boost.org/doc/libs/1_62_0/doc/html/hash.html) library) | `pulsar.JavaStringHash`
+`CompressionType` | The message data compression type used by the producer. The available options are [`LZ4`](https://github.com/lz4/lz4) and [`ZLIB`](https://zlib.net/). | No compression
+`MessageRouter` | By default, Pulsar uses a round-robin routing scheme for [partitioned topics](cookbooks-partitioned.md). The `MessageRouter` parameter enables you to specify custom routing logic via a function that takes the Pulsar message and topic metadata as an argument and returns an integer (where the ), i.e. a function signature of `func(Message, TopicMetadata) int`. |
+
+## Consumers
+
+Pulsar consumers subscribe to one or more Pulsar topics and listen for incoming messages produced on that topic/those topics. You can [configure](#consumer-configuration) Go consumers using a `ConsumerOptions` object. Here's a basic example that uses channels:
+
+```go
+msgChannel := make(chan pulsar.ConsumerMessage)
+
+consumerOpts := pulsar.ConsumerOptions{
+    Topic:            "my-topic",
+    SubscriptionName: "my-subscription-1",
+    Type:             pulsar.Exclusive,
+    MessageChannel:   msgChannel,
+}
+
+consumer, err := client.Subscribe(consumerOpts)
+
+if err != nil {
+    log.Fatalf("Could not establish subscription: %v", err)
+}
+
+defer consumer.Close()
+
+for cm := range msgChannel {
+    msg := cm.Message
+
+    fmt.Printf("Message ID: %s", msg.ID())
+    fmt.Printf("Message value: %s", string(msg.Payload()))
+
+    consumer.Ack(msg)
+}
+```
+
+> #### Blocking operation
+> When you create a new Pulsar consumer, the operation will block (on a go channel) until either a producer is successfully created or an error is thrown.
+
+
+### Consumer operations
+
+Pulsar Go consumers have the following methods available:
+
+Method | Description | Return type
+:------|:------------|:-----------
+`Topic()` | Returns the consumer's [topic](reference-terminology.md#topic) | `string`
+`Subscription()` | Returns the consumer's subscription name | `string`
+`Unsubcribe()` | Unsubscribes the consumer from the assigned topic. Throws an error if the unsubscribe operation is somehow unsuccessful. | `error`
+`Receive(context.Context)` | Receives a single message from the topic. This method blocks until a message is available. | `(Message, error)`
+`Ack(Message)` | [Acknowledges](reference-terminology.md#acknowledgment-ack) a message to the Pulsar [broker](reference-terminology.md#broker) | `error`
+`AckID(MessageID)` | [Acknowledges](reference-terminology.md#acknowledgment-ack) a message to the Pulsar [broker](reference-terminology.md#broker) by message ID | `error`
+`AckCumulative(Message)` | [Acknowledges](reference-terminology.md#acknowledgment-ack) *all* the messages in the stream, up to and including the specified message. The `AckCumulative` method will block until the ack has been sent to the broker. After that, the messages will *not* be redelivered to the consumer. Cumulative acking can only be used with a [shared](concepts-messaging.md#shared) subscription type.
+`Close()` | Closes the consumer, disabling its ability to receive messages from the broker | `error`
+`RedeliverUnackedMessages()` | Redelivers *all* unacknowledged messages on the topic. In [failover](concepts-messaging.md#failover) mode, this request is ignored if the consumer isn't active on the specified topic; in [shared](concepts-messaging.md#shared) mode, redelivered messages are distributed across all consumers connected to the topic. **Note**: this is a *non-blocking* operation that doesn't throw an error. |
+
+#### Receive example
+
+Here's an example usage of a Go consumer that uses the `Receive()` method to process incoming messages:
+
+```go
+import (
+    "context"
+    "log"
+
+    "github.com/apache/pulsar/pulsar-client-go/pulsar"
+)
+
+func main() {
+    // Instantiate a Pulsar client
+    client, err := pulsar.NewClient(pulsar.ClientOptions{
+            URL: "pulsar://localhost:6650",
+    })
+
+    if err != nil { log.Fatal(err) }
+
+    // Use the client object to instantiate a consumer
+    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+        Topic:            "my-golang-topic",
+        SubscriptionName: "sub-1",
+        SubscriptionType: pulsar.Exclusive,
+    })
+
+    if err != nil { log.Fatal(err) }
+
+    defer consumer.Close()
+
+    ctx := context.Background()
+
+    // Listen indefinitely on the topic
+    for {
+        msg, err := consumer.Receive(ctx)
+        if err != nil { log.Fatal(err) }
+
+        // Do something with the message
+
+        consumer.Ack(msg)
+    }
+}
+```
+
+### Consumer configuration
+
+Parameter | Description | Default
+:---------|:------------|:-------
+`Topic` | The Pulsar [topic](reference-terminology.md#topic) on which the consumer will establish a subscription and listen for messages |
+`SubscriptionName` | The subscription name for this consumer |
+`Name` | The name of the consumer |
+`AckTimeout` | | 0
+`SubscriptionType` | Available options are `Exclusive`, `Shared`, and `Failover` | `Exclusive`
+`MessageChannel` | The Go channel used by the consumer. Messages that arrive from the Pulsar topic(s) will be passed to this channel. |
+`ReceiverQueueSize` | Sets the size of the consumer's receiver queue, i.e. the number of messages that can be accumulated by the consumer before the application calls `Receive`. A value higher than the default of 1000 could increase consumer throughput, though at the expense of more memory utilization. | 1000
+`MaxTotalReceiverQueueSizeAcrossPartitions` |Set the max total receiver queue size across partitions. This setting will be used to reduce the receiver queue size for individual partitions if the total exceeds this value | 50000
+
+## Readers
+
+Pulsar readers process messages from Pulsar topics. Readers are different from consumers because with readers you need to explicitly specify which message in the stream you want to begin with (consumers, on the other hand, automatically begin with the most recent unacked message). You can [configure](#reader-configuration) Go readers using a `ReaderOptions` object. Here's an example:
+
+```go
+reader, err := client.CreateReader(pulsar.ReaderOptions{
+    Topic: "my-golang-topic",
+    StartMessageId: pulsar.LatestMessage,
+})
+```
+
+> #### Blocking operation
+> When you create a new Pulsar reader, the operation will block (on a go channel) until either a reader is successfully created or an error is thrown.
+
+
+### Reader operations
+
+Pulsar Go readers have the following methods available:
+
+Method | Description | Return type
+:------|:------------|:-----------
+`Topic()` | Returns the reader's [topic](reference-terminology.md#topic) | `string`
+`Next(context.Context)` | Receives the next message on the topic (analogous to the `Receive` method for [consumers](#consumer-operations)). This method blocks until a message is available. | `(Message, error)`
+`Close()` | Closes the reader, disabling its ability to receive messages from the broker | `error`
+
+#### "Next" example
+
+Here's an example usage of a Go reader that uses the `Next()` method to process incoming messages:
+
+```go
+import (
+    "context"
+    "log"
+
+    "github.com/apache/pulsar/pulsar-client-go/pulsar"
+)
+
+func main() {
+    // Instantiate a Pulsar client
+    client, err := pulsar.NewClient(pulsar.ClientOptions{
+            URL: "pulsar://localhost:6650",
+    })
+
+    if err != nil { log.Fatalf("Could not create client: %v", err) }
+
+    // Use the client to instantiate a reader
+    reader, err := client.CreateReader(pulsar.ReaderOptions{
+        Topic:          "my-golang-topic",
+        StartMessageID: pulsar.EarliestMessage,
+    })
+
+    if err != nil { log.Fatalf("Could not create reader: %v", err) }
+
+    defer reader.Close()
+
+    ctx := context.Background()
+
+    // Listen on the topic for incoming messages
+    for {
+        msg, err := reader.Next(ctx)
+        if err != nil { log.Fatalf("Error reading from topic: %v", err) }
+
+        // Process the message
+    }
+}
+```
+
+In the example above, the reader begins reading from the earliest available message (specified by `pulsar.EarliestMessage`). The reader can also begin reading from the latest message (`pulsar.LatestMessage`) or some other message ID specified by bytes using the `DeserializeMessageID` function, which takes a byte array and returns a `MessageID` object. Here's an example:
+
+```go
+lastSavedId := // Read last saved message id from external store as byte[]
+
+reader, err := client.CreateReader(pulsar.ReaderOptions{
+    Topic:          "my-golang-topic",
+    StartMessageID: DeserializeMessageID(lastSavedId),
+})
+```
+
+### Reader configuration
+
+Parameter | Description | Default
+:---------|:------------|:-------
+`Topic` | The Pulsar [topic](reference-terminology.md#topic) on which the reader will establish a subscription and listen for messages |
+`Name` | The name of the reader |
+`StartMessageID` | THe initial reader position, i.e. the message at which the reader begins processing messages. The options are `pulsar.EarliestMessage` (the earliest available message on the topic), `pulsar.LatestMessage` (the latest available message on the topic), or a `MessageID` object for a position that isn't earliest or latest. |
+`MessageChannel` | The Go channel used by the reader. Messages that arrive from the Pulsar topic(s) will be passed to this channel. |
+`ReceiverQueueSize` | Sets the size of the reader's receiver queue, i.e. the number of messages that can be accumulated by the reader before the application calls `Next`. A value higher than the default of 1000 could increase reader throughput, though at the expense of more memory utilization. | 1000
+`SubscriptionRolePrefix` | The subscription role prefix. | `reader`
+
+## Messages
+
+The Pulsar Go client provides a `ProducerMessage` interface that you can use to construct messages to producer on Pulsar topics. Here's an example message:
+
+```go
+msg := pulsar.ProducerMessage{
+    Payload: []byte("Here is some message data"),
+    Key: "message-key",
+    Properties: map[string]string{
+        "foo": "bar",
+    },
+    EventTime: time.Now(),
+    ReplicationClusters: []string{"cluster1", "cluster3"},
+}
+
+if err := producer.send(msg); err != nil {
+    log.Fatalf("Could not publish message due to: %v", err)
+}
+```
+
+The following methods parameters are available for `ProducerMessage` objects:
+
+Parameter | Description
+:---------|:-----------
+`Payload` | The actual data payload of the message
+`Key` | The optional key associated with the message (particularly useful for things like topic compaction)
+`Properties` | A key-value map (both keys and values must be strings) for any application-specific metadata attached to the message
+`EventTime` | The timestamp associated with the message
+`ReplicationClusters` | The clusters to which this message will be replicated. Pulsar brokers handle message replication automatically; you should only change this setting if you want to override the broker default.
+
+## TLS encryption and authentication
+
+In order to use [TLS encryption](security-tls-transport.md), you'll need to configure your client to do so:
+
+ * Use `pulsar+ssl` URL type
+ * Set `TLSTrustCertsFilePath` to the path to the TLS certs used by your client and the Pulsar broker
+ * Configure `Authentication` option
+
+Here's an example:
+
+```go
+opts := pulsar.ClientOptions{
+    URL: "pulsar+ssl://my-cluster.com:6651",
+    TLSTrustCertsFilePath: "/path/to/certs/my-cert.csr",
+    Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem"),
+}
+```
diff --git a/site2/website/versioned_docs/version-2.2.1/client-libraries-java.md b/site2/website/versioned_docs/version-2.2.1/client-libraries-java.md
new file mode 100644
index 0000000000..80baa4d9d0
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/client-libraries-java.md
@@ -0,0 +1,468 @@
+---
+id: version-2.2.1-client-libraries-java
+title: The Pulsar Java client
+sidebar_label: Java
+original_id: client-libraries-java
+---
+
+The Pulsar Java client can be used both to create Java producers, consumers, and [readers](#readers) of messages and to perform [administrative tasks](admin-api-overview.md). The current version of the Java client is **{{pulsar:version}}**.
+
+Javadoc for the Pulsar client is divided up into two domains, by package:
+
+Package | Description | Maven Artifact
+:-------|:------------|:--------------
+[`org.apache.pulsar.client.api`](/api/client) | The producer and consumer API | [org.apache.pulsar:pulsar-client:{{pulsar:version}}](http://search.maven.org/#artifactdetails%7Corg.apache.pulsar%7Cpulsar-client%7C{{pulsar:version}}%7Cjar)
+[`org.apache.pulsar.client.admin`](/api/admin) | The Java [admin API](admin-api-overview.md) | [org.apache.pulsar:pulsar-client-admin:{{pulsar:version}}](http://search.maven.org/#artifactdetails%7Corg.apache.pulsar%7Cpulsar-client-admin%7C{{pulsar:version}}%7Cjar)
+
+This document will focus only on the client API for producing and consuming messages on Pulsar topics. For a guide to using the Java admin client, see [The Pulsar admin interface](admin-api-overview.md).
+
+## Installation
+
+The latest version of the Pulsar Java client library is available via [Maven Central](http://search.maven.org/#artifactdetails%7Corg.apache.pulsar%7Cpulsar-client%7C{{pulsar:version}}%7Cjar). To use the latest version, add the `pulsar-client` library to your build configuration.
+
+### Maven
+
+If you're using Maven, add this to your `pom.xml`:
+
+```xml
+<!-- in your <properties> block -->
+<pulsar.version>{{pulsar:version}}</pulsar.version>
+
+<!-- in your <dependencies> block -->
+<dependency>
+  <groupId>org.apache.pulsar</groupId>
+  <artifactId>pulsar-client</artifactId>
+  <version>${pulsar.version}</version>
+</dependency>
+```
+
+### Gradle
+
+If you're using Gradle, add this to your `build.gradle` file:
+
+```groovy
+def pulsarVersion = '{{pulsar:version}}'
+
+dependencies {
+    compile group: 'org.apache.pulsar', name: 'pulsar-client', version: pulsarVersion
+}
+```
+
+## Connection URLs
+
+To connect to Pulsar using client libraries, you need to specify a [Pulsar protocol](developing-binary-protocol.md) URL.
+
+Pulsar protocol URLs are assigned to specific clusters, use the `pulsar` scheme and have a default port of 6650. Here's an example for `localhost`:
+
+```http
+pulsar://localhost:6650
+```
+
+A URL for a production Pulsar cluster may look something like this:
+
+```http
+pulsar://pulsar.us-west.example.com:6650
+```
+
+If you're using [TLS](security-tls-authentication.md) authentication, the URL will look like something like this:
+
+```http
+pulsar+ssl://pulsar.us-west.example.com:6651
+```
+
+## Client configuration
+
+You can instantiate a {@inject: javadoc:PulsarClient:/client/org/apache/pulsar/client/api/PulsarClient} object using just a URL for the target Pulsar [cluster](reference-terminology.md#cluster), like this:
+
+```java
+PulsarClient client = PulsarClient.builder()
+        .serviceUrl("pulsar://localhost:6650")
+        .build();
+```
+
+> #### Default broker URLs for standalone clusters
+> If you're running a cluster in [standalone mode](getting-started-standalone.md), the broker will be available at the `pulsar://localhost:6650` URL by default.
+
+Check out the Javadoc for the {@inject: javadoc:PulsarClient:/client/org/apache/pulsar/client/api/PulsarClient} class for a full listing of configurable parameters.
+
+> In addition to client-level configuration, you can also apply [producer](#configuring-producers) and [consumer](#configuring-consumers) specific configuration, as you'll see in the sections below.
+
+
+## Producers
+
+In Pulsar, producers write messages to topics. Once you've instantiated a {@inject: javadoc:PulsarClient:/client/org/apache/pulsar/client/api/PulsarClient} object (as in the section [above](#client-configuration)), you can create a {@inject: javadoc:Producer:/client/org/apache/pulsar/client/api/Producer} for a specific Pulsar [topic](reference-terminology.md#topic).
+
+```java
+Producer<byte[]> producer = client.newProducer()
+        .topic("my-topic")
+        .create();
+
+// You can then send messages to the broker and topic you specified:
+producer.send("My message".getBytes());
+```
+
+By default, producers produce messages that consist of byte arrays. You can produce different types, however, by specifying a message [schema](#schemas).
+
+```java
+Producer<String> stringProducer = client.newProducer(Schema.STRING)
+        .topic("my-topic")
+        .create();
+stringProducer.send("My message");
+```
+
+> You should always make sure to close your producers, consumers, and clients when they are no longer needed:
+> ```java
+> producer.close();
+> consumer.close();
+> client.close();
+> ```
+>
+> Close operations can also be asynchronous:
+> ```java
+> producer.closeAsync()
+>    .thenRun(() -> System.out.println("Producer closed"));
+>    .exceptionally((ex) -> {
+>        System.err.println("Failed to close producer: " + ex);
+>        return ex;
+>    });
+> ```
+
+### Configuring producers
+
+If you instantiate a `Producer` object specifying only a topic name, as in the example above, the producer will use the default configuration. To use a non-default configuration, there's a variety of configurable parameters that you can set. For a full listing, see the Javadoc for the {@inject javadoc:ProducerBuilder:/client/org/apache/pulsar/client/api/ProducerBuilder} class. Here's an example:
+
+```java
+Producer<byte[]> producer = client.newProducer()
+    .topic("my-topic")
+    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
+    .sendTimeout(10, TimeUnit.SECONDS)
+    .blockIfQueueFull(true)
+    .create();
+```
+
+### Message routing
+
+When using partitioned topics, you can specify the routing mode whenever you publish messages using a producer. For more on specifying a routing mode using the Java client, see the [Partitioned Topics](cookbooks-partitioned.md) cookbook.
+
+### Async send
+
+You can also publish messages [asynchronously](concepts-messaging.md#send-modes) using the Java client. With async send, the producer will put the message in a blocking queue and return immediately. The client library will then send the message to the broker in the background. If the queue is full (max size configurable), the producer could be blocked or fail immediately when calling the API, depending on arguments passed to the producer.
+
+Here's an example async send operation:
+
+```java
+producer.sendAsync("my-async-message".getBytes()).thenAccept(msgId -> {
+    System.out.printf("Message with ID %s successfully sent", msgId);
+});
+```
+
+As you can see from the example above, async send operations return a {@inject javadoc:MessageId:/client/org/apache/pulsar/client/api/MessageId} wrapped in a [`CompletableFuture`](http://www.baeldung.com/java-completablefuture).
+
+### Configuring messages
+
+In addition to a value, it's possible to set additional items on a given message:
+
+```java
+producer.newMessage()
+    .key("my-message-key")
+    .value("my-async-message".getBytes())
+    .property("my-key", "my-value")
+    .property("my-other-key", "my-other-value")
+    .send();
+```
+
+As for the previous case, it's also possible to terminate the builder chain with `sendAsync()` and
+get a future returned.
+
+## Consumers
+
+In Pulsar, consumers subscribe to topics and handle messages that producers publish to those topics. You can instantiate a new [consumer](reference-terminology.md#consumer) by first instantiating a {@inject: javadoc:PulsarClient:/client/org/apache/pulsar/client/api/PulsarClient} object and passing it a URL for a Pulsar broker (as [above](#client-configuration)).
+
+Once you've instantiated a {@inject: javadoc:PulsarClient:/client/org/apache/pulsar/client/api/PulsarClient} object, you can create a {@inject: javadoc:Consumer:/client/org/apache/pulsar/client/api/Consumer} by specifying a [topic](reference-terminology.md#topic) and a [subscription](concepts-messaging.md#subscription-modes).
+
+```java
+Consumer consumer = client.newConsumer()
+        .topic("my-topic")
+        .subscriptionName("my-subscription")
+        .subscribe();
+```
+
+The `subscribe` method will automatically subscribe the consumer to the specified topic and subscription. One way to make the consumer listen on the topic is to set up a `while` loop. In this example loop, the consumer listens for messages, prints the contents of any message that's received, and then [acknowledges](reference-terminology.md#acknowledgment-ack) that the message has been processed:
+
+```java
+do {
+  // Wait for a message
+  Message msg = consumer.receive();
+
+  System.out.printf("Message received: %s", new String(msg.getData()));
+
+  // Acknowledge the message so that it can be deleted by the message broker
+  consumer.acknowledge(msg);
+} while (true);
+```
+
+### Configuring consumers
+
+If you instantiate a `Consumer` object specifying only a topic and subscription name, as in the example above, the consumer will use the default configuration. To use a non-default configuration, there's a variety of configurable parameters that you can set. For a full listing, see the Javadoc for the {@inject: javadoc:ConsumerBuilder:/client/org/apache/pulsar/client/api/ConsumerBuilder} class. Here's an example:
+
+Here's an example configuration:
+
+```java
+Consumer consumer = client.newConsumer()
+        .topic("my-topic")
+        .subscriptionName("my-subscription")
+        .ackTimeout(10, TimeUnit.SECONDS)
+        .subscriptionType(SubscriptionType.Exclusive)
+        .subscribe();
+```
+
+### Async receive
+
+The `receive` method will receive messages synchronously (the consumer process will be blocked until a message is available). You can also use [async receive](concepts-messaging.md#receive-modes), which will return immediately with a [`CompletableFuture`](http://www.baeldung.com/java-completablefuture) object that completes once a new message is available.
+
+Here's an example:
+
+```java
+CompletableFuture<Message> asyncMessage = consumer.receiveAsync();
+```
+
+Async receive operations return a {@inject javadoc:Message:/client/org/apache/pulsar/client/api/Message} wrapped inside of a [`CompletableFuture`](http://www.baeldung.com/java-completablefuture).
+
+### Multi-topic subscriptions
+
+In addition to subscribing a consumer to a single Pulsar topic, you can also subscribe to multiple topics simultaneously using [multi-topic subscriptions](concepts-messaging.md#multi-topic-subscriptions). To use multi-topic subscriptions you can supply either a regular expression (regex) or a `List` of topics. If you select topics via regex, all topics must be within the same Pulsar namespace.
+
+Here are some examples:
+
+```java
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.PulsarClient;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+
+ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
+        .subscriptionName(subscription);
+
+// Subscribe to all topics in a namespace
+Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
+Consumer allTopicsConsumer = consumerBuilder
+        .topicsPattern(allTopicsInNamespace)
+        .subscribe();
+
+// Subscribe to a subsets of topics in a namespace, based on regex
+Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
+Consumer allTopicsConsumer = consumerBuilder
+        .topicsPattern(someTopicsInNamespace)
+        .subscribe();
+```
+
+You can also subscribe to an explicit list of topics (across namespaces if you wish):
+
+```java
+List<String> topics = Arrays.asList(
+        "topic-1",
+        "topic-2",
+        "topic-3"
+);
+
+Consumer multiTopicConsumer = consumerBuilder
+        .topics(topics)
+        .subscribe();
+
+// Alternatively:
+Consumer multiTopicConsumer = consumerBuilder
+        .topics(
+            "topic-1",
+            "topic-2",
+            "topic-3"
+        )
+        .subscribe();
+```
+
+You can also subscribe to multiple topics asynchronously using the `subscribeAsync` method rather than the synchronous `subscribe` method. Here's an example:
+
+```java
+Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default.*");
+consumerBuilder
+        .topics(topics)
+        .subscribeAsync()
+        .thenAccept(consumer -> {
+            do {
+                try {
+                    Message msg = consumer.receive();
+                    // Do something with the received message
+                } catch (PulsarClientException e) {
+                    e.printStackTrace();
+                }
+            } while (true);
+        });
+```
+
+## Reader interface {#readers}
+
+With the [reader interface](concepts-clients.md#reader-interface), Pulsar clients can "manually position" themselves within a topic, reading all messages from a specified message onward. The Pulsar API for Java enables you to create  {@inject: javadoc:Reader:/client/org/apache/pulsar/client/api/Reader} objects by specifying a topic, a {@inject: javadoc:MessageId:/client/org/apache/pulsar/client/api/MessageId}, and {@inject javadoc:ReaderConfiguration:/client/org/apache/pulsar/client/api/ReaderConfiguration}.
+
+Here's an example:
+
+```java
+ReaderConfiguration conf = new ReaderConfiguration();
+byte[] msgIdBytes = // Some message ID byte array
+MessageId id = MessageId.fromByteArray(msgIdBytes);
+Reader reader = pulsarClient.newReader()
+        .topic(topic)
+        .startMessageId(id)
+        .create();
+
+while (true) {
+    Message message = reader.readNext();
+    // Process message
+}
+```
+
+In the example above, a `Reader` object is instantiated for a specific topic and message (by ID); the reader then iterates over each message in the topic after the message identified by `msgIdBytes` (how that value is obtained depends on the application).
+
+The code sample above shows pointing the `Reader` object to a specific message (by ID), but you can also use `MessageId.earliest` to point to the earliest available message on the topic of `MessageId.latest` to point to the most recent available message.
+
+## Schemas
+
+In Pulsar, all message data consists of byte arrays "under the hood." [Message schemas](concepts-schema-registry.md) enable you to use other types of data when constructing and handling messages (from simple types like strings to more complex, application-specific types). If you construct, say, a [producer](#producers) without specifying a schema, then the producer can only produce messages of type `byte[]`. Here's an example:
+
+```java
+Producer<byte[]> producer = client.newProducer()
+        .topic(topic)
+        .create();
+```
+
+The producer above is equivalent to a `Producer<byte[]>` (in fact, you should *always* explicitly specify the type). If you'd like to use a producer for a different type of data, you'll need to specify a **schema** that informs Pulsar which data type will be transmitted over the [topic](reference-terminology.md#topic).
+
+### Schema example
+
+Let's say that you have a `SensorReading` class that you'd like to transmit over a Pulsar topic:
+
+```java
+public class SensorReading {
+    public float temperature;
+
+    public SensorReading(float temperature) {
+        this.temperature = temperature;
+    }
+
+    // A no-arg constructor is required
+    public SensorReading() {
+    }
+
+    public float getTemperature() {
+        return temperature;
+    }
+
+    public void setTemperature(float temperature) {
+        this.temperature = temperature;
+    }
+}
+```
+
+You could then create a `Producer<SensorReading>` (or `Consumer<SensorReading>`) like so:
+
+```java
+Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
+        .topic("sensor-readings")
+        .create();
+```
+
+The following schema formats are currently available for Java:
+
+* No schema or the byte array schema (which can be applied using `Schema.BYTES`):
+
+  ```java
+  Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)
+        .topic("some-raw-bytes-topic")
+        .create();
+  ```
+
+  Or, equivalently:
+
+  ```java
+  Producer<byte[]> bytesProducer = client.newProducer()
+        .topic("some-raw-bytes-topic")
+        .create();
+  ```
+
+* `String` for normal UTF-8-encoded string data. This schema can be applied using `Schema.STRING`:
+
+  ```java
+  Producer<String> stringProducer = client.newProducer(Schema.STRING)
+        .topic("some-string-topic")
+        .create();
+  ```
+* JSON schemas can be created for POJOs using the `JSONSchema` class. Here's an example:
+
+  ```java
+  Schema<MyPojo> pojoSchema = JSONSchema.of(MyPojo.class);
+  Producer<MyPojo> pojoProducer = client.newProducer(pojoSchema)
+        .topic("some-pojo-topic")
+        .create();
+  ```
+
+## Authentication
+
+Pulsar currently supports two authentication schemes: [TLS](security-tls-authentication.md) and [Athenz](security-athenz.md). The Pulsar Java client can be used with both.
+
+### TLS Authentication
+
+To use [TLS](security-tls-authentication.md), you need to set TLS to `true` using the `setUseTls` method, point your Pulsar client to a TLS cert path, and provide paths to cert and key files.
+
+Here's an example configuration:
+
+```java
+Map<String, String> authParams = new HashMap<>();
+authParams.put("tlsCertFile", "/path/to/client-cert.pem");
+authParams.put("tlsKeyFile", "/path/to/client-key.pem");
+
+Authentication tlsAuth = AuthenticationFactory
+        .create(AuthenticationTls.class.getName(), authParams);
+
+PulsarClient client = PulsarClient.builder()
+        .serviceUrl("pulsar+ssl://my-broker.com:6651")
+        .enableTls(true)
+        .tlsTrustCertsFilePath("/path/to/cacert.pem")
+        .authentication(tlsAuth)
+        .build();
+```
+
+### Athenz
+
+To use [Athenz](security-athenz.md) as an authentication provider, you need to [use TLS](#tls-authentication) and provide values for four parameters in a hash:
+
+* `tenantDomain`
+* `tenantService`
+* `providerDomain`
+* `privateKey`
+
+You can also set an optional `keyId`. Here's an example configuration:
+
+```java
+Map<String, String> authParams = new HashMap<>();
+authParams.put("tenantDomain", "shopping"); // Tenant domain name
+authParams.put("tenantService", "some_app"); // Tenant service name
+authParams.put("providerDomain", "pulsar"); // Provider domain name
+authParams.put("privateKey", "file:///path/to/private.pem"); // Tenant private key path
+authParams.put("keyId", "v1"); // Key id for the tenant private key (optional, default: "0")
+
+Authentication athenzAuth = AuthenticationFactory
+        .create(AuthenticationAthenz.class.getName(), authParams);
+
+PulsarClient client = PulsarClient.builder()
+        .serviceUrl("pulsar+ssl://my-broker.com:6651")
+        .enableTls(true)
+        .tlsTrustCertsFilePath("/path/to/cacert.pem")
+        .authentication(athenzAuth)
+        .build();
+```
+
+> #### Supported pattern formats
+> The `privateKey` parameter supports the following three pattern formats:
+> * `file:///path/to/file`
+> * `file:/path/to/file`
+> * `data:application/x-pem-file;base64,<base64-encoded value>`
diff --git a/site2/website/versioned_docs/version-2.2.1/client-libraries-websocket.md b/site2/website/versioned_docs/version-2.2.1/client-libraries-websocket.md
new file mode 100644
index 0000000000..eaf8aed29d
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/client-libraries-websocket.md
@@ -0,0 +1,444 @@
+---
+id: version-2.2.1-client-libraries-websocket
+title: Pulsar's WebSocket API
+sidebar_label: WebSocket
+original_id: client-libraries-websocket
+---
+
+Pulsar's [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API) API is meant to provide a simple way to interact with Pulsar using languages that do not have an official [client library](getting-started-clients.md). Through WebSockets you can publish and consume messages and use all the features available in the [Java](client-libraries-java.md), [Go](client-libraries-go.md), [Python](client-libraries-python.md) and [C++](client-libraries-cpp.md) client libraries.
+
+
+> You can use Pulsar's WebSocket API with any WebSocket client library. See examples for Python and Node.js [below](#client-examples).
+
+## Running the WebSocket service
+
+The standalone variant of Pulsar that we recommend using for [local development](getting-started-standalone.md) already has the WebSocket service enabled.
+
+In non-standalone mode, there are two ways to deploy the WebSocket service:
+
+* [embedded](#embedded-with-a-pulsar-broker) with a Pulsar broker
+* as a [separate component](#as-a-separate-component)
+
+### Embedded with a Pulsar broker
+
+In this mode, the WebSocket service will run within the same HTTP service that's already running in the broker. To enable this mode, set the [`webSocketServiceEnabled`](reference-configuration.md#broker-webSocketServiceEnabled) parameter in the [`conf/broker.conf`](reference-configuration.md#broker) configuration file in your installation.
+
+```properties
+webSocketServiceEnabled=true
+```
+
+### As a separate component
+
+In this mode, the WebSocket service will be run from a Pulsar [broker](reference-terminology.md#broker) as a separate service. Configuration for this mode is handled in the [`conf/websocket.conf`](reference-configuration.md#websocket) configuration file. You'll need to set *at least* the following parameters:
+
+* [`globalZookeeperServers`](reference-configuration.md#websocket-globalZookeeperServers)
+* [`webServicePort`](reference-configuration.md#websocket-webServicePort)
+* [`clusterName`](reference-configuration.md#websocket-clusterName)
+
+Here's an example:
+
+```properties
+globalZookeeperServers=zk1:2181,zk2:2181,zk3:2181
+webServicePort=8080
+clusterName=my-cluster
+```
+
+### Starting the broker
+
+When the configuration is set, you can start the service using the [`pulsar-daemon`](reference-cli-tools.md#pulsar-daemon) tool:
+
+```shell
+$ bin/pulsar-daemon start websocket
+```
+
+## API Reference
+
+Pulsar's WebSocket API offers three endpoints for [producing](#producer-endpoint) messages, [consuming](#consumer-endpoint) messages and [reading](#reader-endpoint) messages.
+
+All exchanges via the WebSocket API use JSON.
+
+### Producer endpoint
+
+The producer endpoint requires you to specify a tenant, namespace, and topic in the URL:
+
+```http
+ws://broker-service-url:8080/ws/v2/producer/persistent/:tenant/:namespace/:topic 
+```
+
+##### Query param
+
+Key | Type | Required? | Explanation
+:---|:-----|:----------|:-----------
+`sendTimeoutMillis` | long | no | Send timeout (default: 30 secs)
+`batchingEnabled` | boolean | no | Enable batching of messages (default: false)
+`batchingMaxMessages` | int | no | Maximum number of messages permitted in a batch (default: 1000)
+`maxPendingMessages` | int | no | Set the max size of the internal-queue holding the messages (default: 1000)
+`batchingMaxPublishDelay` | long | no | Time period within which the messages will be batched (default: 10ms)
+`messageRoutingMode` | string | no | Message [routing mode](https://pulsar.apache.org/api/client/index.html?org/apache/pulsar/client/api/ProducerConfiguration.MessageRoutingMode.html) for the partitioned producer: `SinglePartition`, `RoundRobinPartition`
+`compressionType` | string | no | Compression [type](https://pulsar.apache.org/api/client/index.html?org/apache/pulsar/client/api/CompressionType.html): `LZ4`, `ZLIB`
+`producerName` | string | no | Specify the name for the producer. Pulsar will enforce only one producer with same name can be publishing on a topic
+`initialSequenceId` | long | no | Set the baseline for the sequence ids for messages published by the producer.
+`hashingScheme` | string | no | [Hashing function](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.HashingScheme.html) to use when publishing on a partitioned topic: `JavaStringHash`, `Murmur3_32Hash`
+
+
+#### Publishing a message
+
+```json
+{
+  "payload": "SGVsbG8gV29ybGQ=",
+  "properties": {"key1": "value1", "key2": "value2"},
+  "context": "1"
+}
+```
+
+Key | Type | Required? | Explanation
+:---|:-----|:----------|:-----------
+`payload` | string | yes | Base-64 encoded payload
+`properties` | key-value pairs | no | Application-defined properties
+`context` | string | no | Application-defined request identifier
+`key` | string | no | For partitioned topics, decides which partition to use
+`replicationClusters` | array | no | Restrict replication to this list of [clusters](reference-terminology.md#cluster), specified by name
+
+
+##### Example success response
+
+```json
+{
+   "result": "ok",
+   "messageId": "CAAQAw==",
+   "context": "1"
+ }
+```
+##### Example failure response
+
+```json
+ {
+   "result": "send-error:3",
+   "errorMsg": "Failed to de-serialize from JSON",
+   "context": "1"
+ }
+```
+
+Key | Type | Required? | Explanation
+:---|:-----|:----------|:-----------
+`result` | string | yes | `ok` if successful or an error message if unsuccessful
+`messageId` | string | yes | Message ID assigned to the published message
+`context` | string | no | Application-defined request identifier
+
+
+### Consumer endpoint
+
+The consumer endpoint requires you to specify a tenant, namespace, and topic, as well as a subscription, in the URL:
+
+```http
+ws://broker-service-url:8080/ws/v2/consumer/persistent/:tenant/:namespace/:topic/:subscription
+```
+
+##### Query param
+
+Key | Type | Required? | Explanation
+:---|:-----|:----------|:-----------
+`ackTimeoutMillis` | long | no | Set the timeout for unacked messages (default: 0)
+`subscriptionType` | string | no | [Subscription type](https://pulsar.apache.org/api/client/index.html?org/apache/pulsar/client/api/SubscriptionType.html): `Exclusive`, `Failover`, `Shared`
+`receiverQueueSize` | int | no | Size of the consumer receive queue (default: 1000)
+`consumerName` | string | no | Consumer name
+`priorityLevel` | int | no | Define a [priority](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setPriorityLevel-int-) for the consumer
+`maxRedeliverCount` | int | no | Define a [maxRedeliverCount](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: 0). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.
+`deadLetterTopic` | string | no | Define a [deadLetterTopic](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: {topic}-{subscription}-DLQ). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.
+`pullMode` | boolean | no | Enable pull mode (default: false). See "Flow Control" below.
+
+NB: these parameter (except `pullMode`) apply to the internal consumer of the WebSocket service.
+So messages will be subject to the redelivery settings as soon as the get into the receive queue,
+even if the client doesn't consume on the WebSocket.
+
+##### Receiving messages
+
+Server will push messages on the WebSocket session:
+
+```json
+{
+  "messageId": "CAAQAw==",
+  "payload": "SGVsbG8gV29ybGQ=",
+  "properties": {"key1": "value1", "key2": "value2"},
+  "publishTime": "2016-08-30 16:45:57.785"
+}
+```
+
+Key | Type | Required? | Explanation
+:---|:-----|:----------|:-----------
+`messageId` | string | yes | Message ID
+`payload` | string | yes | Base-64 encoded payload
+`publishTime` | string | yes | Publish timestamp
+`properties` | key-value pairs | no | Application-defined properties
+`key` | string | no |  Original routing key set by producer
+
+#### Acknowledging the message
+
+Consumer needs to acknowledge the successful processing of the message to
+have the Pulsar broker delete it.
+
+```json
+{
+  "messageId": "CAAQAw=="
+}
+```
+
+Key | Type | Required? | Explanation
+:---|:-----|:----------|:-----------
+`messageId`| string | yes | Message ID of the processed message
+
+#### Flow control
+
+##### Push Mode
+
+By default (`pullMode=false`), the consumer endpoint will use the `receiverQueueSize` parameter both to size its
+internal receive queue and to limit the number of unacknowledged messages that are passed to the WebSocket client.
+In this mode, if you don't send acknowledgements, the Pulsar WebSocket service will stop sending messages after reaching
+`receiverQueueSize` unacked messages sent to the WebSocket client.
+
+##### Pull Mode
+
+If you set `pullMode` to `true`, the WebSocket client will need to send `permit` commands to permit the
+Pulsar WebSocket service to send more messages.
+
+```json
+{
+  "type": "permit",
+  "permitMessages": 100
+}
+```
+
+Key | Type | Required? | Explanation
+:---|:-----|:----------|:-----------
+`type`| string | yes | Type of command. Must be `permit`
+`permitMessages`| int | yes | Number of messages to permit
+
+NB: in this mode it's possible to acknowledge messages in a different connection.
+
+### Reader endpoint
+
+The reader endpoint requires you to specify a tenant, namespace, and topic in the URL:
+
+```http
+ws://broker-service-url:8080/ws/v2/reader/persistent/:tenant/:namespace/:topic
+```
+
+##### Query param
+
+Key | Type | Required? | Explanation
+:---|:-----|:----------|:-----------
+`readerName` | string | no | Reader name
+`receiverQueueSize` | int | no | Size of the consumer receive queue (default: 1000)
+`messageId` | int or enum | no | Message ID to start from, `earliest` or `latest` (default: `latest`)
+
+##### Receiving messages
+
+Server will push messages on the WebSocket session:
+
+```json
+{
+  "messageId": "CAAQAw==",
+  "payload": "SGVsbG8gV29ybGQ=",
+  "properties": {"key1": "value1", "key2": "value2"},
+  "publishTime": "2016-08-30 16:45:57.785"
+}
+```
+
+Key | Type | Required? | Explanation
+:---|:-----|:----------|:-----------
+`messageId` | string | yes | Message ID
+`payload` | string | yes | Base-64 encoded payload
+`publishTime` | string | yes | Publish timestamp
+`properties` | key-value pairs | no | Application-defined properties
+`key` | string | no |  Original routing key set by producer
+
+#### Acknowledging the message
+
+**In WebSocket**, Reader needs to acknowledge the successful processing of the message to
+have the Pulsar WebSocket service update the number of pending messages.
+If you don't send acknowledgements, Pulsar WebSocket service will stop sending messages after reaching the pendingMessages limit.
+
+```json
+{
+  "messageId": "CAAQAw=="
+}
+```
+
+Key | Type | Required? | Explanation
+:---|:-----|:----------|:-----------
+`messageId`| string | yes | Message ID of the processed message
+
+
+### Error codes
+
+In case of error the server will close the WebSocket session using the
+following error codes:
+
+Error Code | Error Message
+:----------|:-------------
+1 | Failed to create producer
+2 | Failed to subscribe
+3 | Failed to deserialize from JSON
+4 | Failed to serialize to JSON
+5 | Failed to authenticate client
+6 | Client is not authorized
+7 | Invalid payload encoding
+8 | Unknown error
+
+> The application is responsible for re-establishing a new WebSocket session after a backoff period.
+
+## Client examples
+
+Below you'll find code examples for the Pulsar WebSocket API in [Python](#python) and [Node.js](#nodejs).
+
+### Python
+
+This example uses the [`websocket-client`](https://pypi.python.org/pypi/websocket-client) package. You can install it using [pip](https://pypi.python.org/pypi/pip):
+
+```shell
+$ pip install websocket-client
+```
+
+You can also download it from [PyPI](https://pypi.python.org/pypi/websocket-client).
+
+#### Python producer
+
+Here's an example Python producer that sends a simple message to a Pulsar [topic](reference-terminology.md#topic):
+
+```python
+import websocket, base64, json
+
+TOPIC = 'ws://localhost:8080/ws/v2/producer/persistent/public/default/my-topic'
+
+ws = websocket.create_connection(TOPIC)
+
+# Send one message as JSON
+ws.send(json.dumps({
+    'payload' : base64.b64encode('Hello World'),
+    'properties': {
+        'key1' : 'value1',
+        'key2' : 'value2'
+    },
+    'context' : 5
+}))
+
+response =  json.loads(ws.recv())
+if response['result'] == 'ok':
+    print 'Message published successfully'
+else:
+    print 'Failed to publish message:', response
+ws.close()
+```
+
+#### Python consumer
+
+Here's an example Python consumer that listens on a Pulsar topic and prints the message ID whenever a message arrives:
+
+```python
+import websocket, base64, json
+
+TOPIC = 'ws://localhost:8080/ws/v2/consumer/persistent/public/default/my-topic/my-sub'
+
+ws = websocket.create_connection(TOPIC)
+
+while True:
+    msg = json.loads(ws.recv())
+    if not msg: break
+
+    print "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload']))
+
+    # Acknowledge successful processing
+    ws.send(json.dumps({'messageId' : msg['messageId']}))
+
+ws.close()
+```
+
+#### Python reader
+
+Here's an example Python reader that listens on a Pulsar topic and prints the message ID whenever a message arrives:
+
+```python
+import websocket, base64, json
+
+TOPIC = 'ws://localhost:8080/ws/v2/reader/persistent/public/default/my-topic'
+
+ws = websocket.create_connection(TOPIC)
+
+while True:
+    msg = json.loads(ws.recv())
+    if not msg: break
+
+    print "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload']))
+
+    # Acknowledge successful processing
+    ws.send(json.dumps({'messageId' : msg['messageId']}))
+
+ws.close()
+```
+
+### Node.js
+
+This example uses the [`ws`](https://websockets.github.io/ws/) package. You can install it using [npm](https://www.npmjs.com/):
+
+```shell
+$ npm install ws
+```
+
+#### Node.js producer
+
+Here's an example Node.js producer that sends a simple message to a Pulsar topic:
+
+```javascript
+var WebSocket = require('ws'),
+    topic = "ws://localhost:8080/ws/v2/producer/persistent/my-tenant/my-ns/my-topic1",
+    ws = new WebSocket(topic);
+
+var message = {
+  "payload" : new Buffer("Hello World").toString('base64'),
+  "properties": {
+    "key1" : "value1",
+    "key2" : "value2"
+  },
+  "context" : "1"
+};
+
+ws.on('open', function() {
+  // Send one message
+  ws.send(JSON.stringify(message));
+});
+
+ws.on('message', function(message) {
+  console.log('received ack: %s', message);
+});
+```
+
+#### Node.js consumer
+
+Here's an example Node.js consumer that listens on the same topic used by the producer above:
+
+```javascript
+var WebSocket = require('ws'),
+    topic = "ws://localhost:8080/ws/v2/consumer/persistent/my-tenant/my-ns/my-topic1/my-sub",
+    ws = new WebSocket(topic);
+
+ws.on('message', function(message) {
+    var receiveMsg = JSON.parse(message);
+    console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString());
+    var ackMsg = {"messageId" : receiveMsg.messageId};
+    ws.send(JSON.stringify(ackMsg));
+});
+```
+
+#### NodeJS reader
+```javascript
+var WebSocket = require('ws'),
+    topic = "ws://localhost:8080/ws/v2/reader/persistent/my-tenant/my-ns/my-topic1",
+    ws = new WebSocket(topic);
+
+ws.on('message', function(message) {
+    var receiveMsg = JSON.parse(message);
+    console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString());
+    var ackMsg = {"messageId" : receiveMsg.messageId};
+    ws.send(JSON.stringify(ackMsg));
+});
+```
diff --git a/site2/website/versioned_docs/version-2.2.1/concepts-clients.md b/site2/website/versioned_docs/version-2.2.1/concepts-clients.md
new file mode 100644
index 0000000000..103fb201ef
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/concepts-clients.md
@@ -0,0 +1,80 @@
+---
+id: version-2.2.1-concepts-clients
+title: Pulsar Clients
+sidebar_label: Clients
+original_id: concepts-clients
+---
+
+Pulsar exposes a client API with language bindings for [Java](client-libraries-java.md),  [Go](client-libraries-go.md), [Python](client-libraries-python.md) and [C++](client-libraries-cpp.md). The client API optimizes and encapsulates Pulsar's client-broker communication protocol and exposes a simple and intuitive API for use by applications.
+
+Under the hood, the current official Pulsar client libraries support transparent reconnection and/or connection failover to brokers, queuing of messages until acknowledged by the broker, and heuristics such as connection retries with backoff.
+
+> #### Custom client libraries
+> If you'd like to create your own client library, we recommend consulting the documentation on Pulsar's custom [binary protocol](developing-binary-protocol.md)
+
+
+## Client setup phase
+
+When an application wants to create a producer/consumer, the Pulsar client library will initiate a setup phase that is composed of two steps:
+
+1. The client will attempt to determine the owner of the topic by sending an HTTP lookup request to the broker. The request could reach one of the active brokers which, by looking at the (cached) zookeeper metadata will know who is serving the topic or, in case nobody is serving it, will try to assign it to the least loaded broker.
+1. Once the client library has the broker address, it will create a TCP connection (or reuse an existing connection from the pool) and authenticate it. Within this connection, client and broker exchange binary commands from a custom protocol. At this point the client will send a command to create producer/consumer to the broker, which will comply after having validated the authorization policy.
+
+Whenever the TCP connection breaks, the client will immediately re-initiate this setup phase and will keep trying with exponential backoff to re-establish the producer or consumer until the operation succeeds.
+
+## Reader interface
+
+In Pulsar, the "standard" [consumer interface](concepts-messaging.md#consumers) involves using consumers to listen on [topics](reference-terminology.md#topic), process incoming messages, and finally acknowledge those messages when they've been processed. Whenever a consumer connects to a topic, it automatically begins reading from the earliest un-acked message onward because the topic's cursor is automatically managed by Pulsar.
+
+The **reader interface** for Pulsar enables applications to manually manage cursors. When you use a reader to connect to a topic---rather than a consumer---you need to specify *which* message the reader begins reading from when it connects to a topic. When connecting to a topic, the reader interface enables you to begin with:
+
+* The **earliest** available message in the topic
+* The **latest** available message in the topic
+* Some other message between the earliest and the latest. If you select this option, you'll need to explicitly provide a message ID. Your application will be responsible for "knowing" this message ID in advance, perhaps fetching it from a persistent data store or cache.
+
+The reader interface is helpful for use cases like using Pulsar to provide [effectively-once](https://streaml.io/blog/exactly-once/) processing semantics for a stream processing system. For this use case, it's essential that the stream processing system be able to "rewind" topics to a specific message and begin reading there. The reader interface provides Pulsar clients with the low-level abstraction necessary to "manually position" themselves within a topic.
+
+![The Pulsar consumer and reader interfaces](assets/pulsar-reader-consumer-interfaces.png)
+
+> ### Non-partitioned topics only
+> The reader interface for Pulsar cannot currently be used with [partitioned topics](concepts-messaging.md#partitioned-topics).
+
+Here's a Java example that begins reading from the earliest available message on a topic:
+
+```java
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Reader;
+
+// Create a reader on a topic and for a specific message (and onward)
+Reader<byte[]> reader = pulsarClient.newReader()
+    .topic("reader-api-test")
+    .startMessageId(MessageId.earliest)
+    .create();
+
+while (true) {
+    Message message = reader.readNext();
+
+    // Process the message
+}
+```
+
+To create a reader that will read from the latest available message:
+
+```java
+Reader<byte[]> reader = pulsarClient.newReader()
+    .topic(topic)
+    .startMessageId(MessageId.latest)
+    .create();
+```
+
+To create a reader that will read from some message between earliest and latest:
+
+```java
+byte[] msgIdBytes = // Some byte array
+MessageId id = MessageId.fromByteArray(msgIdBytes);
+Reader<byte[]> reader = pulsarClient.newReader()
+    .topic(topic)
+    .startMessageId(id)
+    .create();
+```
diff --git a/site2/website/versioned_docs/version-2.2.1/concepts-overview.md b/site2/website/versioned_docs/version-2.2.1/concepts-overview.md
new file mode 100644
index 0000000000..b88afe6228
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/concepts-overview.md
@@ -0,0 +1,32 @@
+---
+id: version-2.2.1-concepts-overview
+title: Pulsar Overview
+sidebar_label: Overview
+original_id: concepts-overview
+---
+
+Pulsar is a multi-tenant, high-performance solution for server-to-server messaging originally developed by [Yahoo](http://yahoo.github.io/) and now under the stewardship of the [Apache Software Foundation](https://www.apache.org/).
+
+Pulsar's key features include:
+
+* Native support for multiple clusters in a Pulsar instance, with seamless [geo-replication](administration-geo.md) of messages across clusters
+* Very low publish and end-to-end latency
+* Seamless scalability out to over a million topics
+* A simple [client API](concepts-clients.md) with bindings for [Java](client-libraries-java.md), [Go](client-libraries-go.md), [Python](client-libraries-python.md) and [C++](client-libraries-cpp.md)
+* Multiple [subscription modes](concepts-messaging.md#subscription-modes) for topics ([exclusive](concepts-messaging.md#exclusive), [shared](concepts-messaging.md#shared), and [failover](concepts-messaging.md#failover))
+* Guaranteed message delivery with [persistent message storage](concepts-architecture-overview.md#persistent-storage) provided by [Apache BookKeeper](http://bookkeeper.apache.org/)
+* A serverless lightweight computing framework [Pulsar Functions](functions-overview.md) offers stream native data processing.
+* A serverless connector framework [Pulsar IO](io-overview.md) built on-top-of Pulsar Functions to make moving data in and out Apache Pulsar easier.
+* [Tiered Storage](concepts-tiered-storage.md) offloads data from hot/warn storage to cold/longterm storage (such as S3 and GCS) when the data is aging out.
+
+## Contents
+
+- [Messaging Concepts](concepts-messaging.md)
+- [Architecture Overview](concepts-architecture-overview.md)
+- [Pulsar Clients](concepts-clients.md)
+- [Geo Replication](concepts-replication.md)
+- [Multi Tenancy](concepts-multi-tenancy.md)
+- [Authentication and Authorization](concepts-authentication.md)
+- [Topic Compaction](concepts-topic-compaction.md)
+- [Tiered Storage](concepts-tiered-storage.md)
+- [Schema Registry](concepts-schema-registry.md)
diff --git a/site2/website/versioned_docs/version-2.2.1/deploy-bare-metal.md b/site2/website/versioned_docs/version-2.2.1/deploy-bare-metal.md
new file mode 100644
index 0000000000..6a04ecffbb
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/deploy-bare-metal.md
@@ -0,0 +1,384 @@
+---
+id: version-2.2.1-deploy-bare-metal
+title: Deploying a cluster on bare metal
+sidebar_label: Bare metal
+original_id: deploy-bare-metal
+---
+
+
+> ### Tips
+>
+> 1. Single-cluster Pulsar installations should be sufficient for all but the most ambitious use cases. If you're interested in experimenting with
+> Pulsar or using it in a startup or on a single team, we recommend opting for a single cluster. If you do need to run a multi-cluster Pulsar instance,
+> however, see the guide [here](deploy-bare-metal-multi-cluster.md).
+>
+> 2. If you want to use all builtin [Pulsar IO](io-overview.md) connectors in your Pulsar deployment, you need to download `apache-pulsar-io-connectors`
+> package and make sure it is installed under `connectors` directory in the pulsar directory on every broker node or on every function-worker node if you
+> have run a separate cluster of function workers for [Pulsar Functions](functions-overview.md).
+
+Deploying a Pulsar cluster involves doing the following (in order):
+
+* Deploying a [ZooKeeper](#deploying-a-zookeeper-cluster) cluster (optional)
+* Initializing [cluster metadata](#initializing-cluster-metadata)
+* Deploying a [BookKeeper](#deploying-a-bookkeeper-cluster) cluster
+* Deploying one or more Pulsar [brokers](#deploying-pulsar-brokers)
+
+## Preparation
+
+### Requirements
+
+> If you already have an existing zookeeper cluster and would like to reuse it, you don't need to prepare the machines
+> for running ZooKeeper.
+
+To run Pulsar on bare metal, you are recommended to have:
+
+* At least 6 Linux machines or VMs
+  * 3 running [ZooKeeper](https://zookeeper.apache.org)
+  * 3 running a Pulsar broker, and a [BookKeeper](https://bookkeeper.apache.org) bookie
+* A single [DNS](https://en.wikipedia.org/wiki/Domain_Name_System) name covering all of the Pulsar broker hosts
+
+> However if you don't have enough machines, or are trying out Pulsar in cluster mode (and expand the cluster later),
+> you can even deploy Pulsar in one node, where it will run zookeeper, bookie and broker in same machine.
+
+Each machine in your cluster will need to have [Java 8](http://www.oracle.com/technetwork/java/javase/downloads/index.html) or higher installed.
+
+Here's a diagram showing the basic setup:
+
+![alt-text](assets/pulsar-basic-setup.png)
+
+In this diagram, connecting clients need to be able to communicate with the Pulsar cluster using a single URL, in this case `pulsar-cluster.acme.com`, that abstracts over all of the message-handling brokers. Pulsar message brokers run on machines alongside BookKeeper bookies; brokers and bookies, in turn, rely on ZooKeeper.
+
+### Hardware considerations
+
+When deploying a Pulsar cluster, we have some basic recommendations that you should keep in mind when capacity planning.
+
+#### ZooKeeper
+
+For machines running ZooKeeper, we recommend using lighter-weight machines or VMs. Pulsar uses ZooKeeper only for periodic coordination- and configuration-related tasks, *not* for basic operations. If you're running Pulsar on [Amazon Web Services](https://aws.amazon.com/) (AWS), for example, a [t2.small](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/t2-instances.html) instance would likely suffice.
+
+#### Bookies & Brokers
+
+For machines running a bookie and a Pulsar broker, we recommend using more powerful machines. For an AWS deployment, for example, [i3.4xlarge](https://aws.amazon.com/blogs/aws/now-available-i3-instances-for-demanding-io-intensive-applications/) instances may be appropriate. On those machines we also recommend:
+
+* Fast CPUs and 10Gbps [NIC](https://en.wikipedia.org/wiki/Network_interface_controller) (for Pulsar brokers)
+* Small and fast [solid-state drives](https://en.wikipedia.org/wiki/Solid-state_drive) (SSDs) or [hard disk drives](https://en.wikipedia.org/wiki/Hard_disk_drive) (HDDs) with a [RAID](https://en.wikipedia.org/wiki/RAID) controller and a battery-backed write cache (for BookKeeper bookies)
+
+## Installing the Pulsar binary package
+
+> You'll need to install the Pulsar binary package on *each machine in the cluster*, including machines running [ZooKeeper](#deploying-a-zookeeper-cluster) and [BookKeeper](#deploying-a-bookkeeper-cluster).
+
+To get started deploying a Pulsar cluster on bare metal, you'll need to download a binary tarball release in one of the following ways:
+
+* By clicking on the link directly below, which will automatically trigger a download:
+  * <a href="pulsar:binary_release_url" download>Pulsar {{pulsar:version}} binary release</a>
+* From the Pulsar [downloads page](pulsar:download_page_url)
+* From the Pulsar [releases page](https://github.com/apache/pulsar/releases/latest) on [GitHub](https://github.com)
+* Using [wget](https://www.gnu.org/software/wget):
+
+```bash
+$ wget pulsar:binary_release_url
+```
+
+Once you've downloaded the tarball, untar it and `cd` into the resulting directory:
+
+```bash
+$ tar xvzf apache-pulsar-{{pulsar:version}}-bin.tar.gz
+$ cd apache-pulsar-{{pulsar:version}}
+```
+
+The untarred directory contains the following subdirectories:
+
+Directory | Contains
+:---------|:--------
+`bin` | Pulsar's [command-line tools](reference-cli-tools.md), such as [`pulsar`](reference-cli-tools.md#pulsar) and [`pulsar-admin`](reference-pulsar-admin.md)
+`conf` | Configuration files for Pulsar, including for [broker configuration](reference-configuration.md#broker), [ZooKeeper configuration](reference-configuration.md#zookeeper), and more
+`data` | The data storage directory used by ZooKeeper and BookKeeper.
+`lib` | The [JAR](https://en.wikipedia.org/wiki/JAR_(file_format)) files used by Pulsar.
+`logs` | Logs created by the installation.
+
+## Installing Builtin Connectors (optional)
+
+> Since release `2.1.0-incubating`, Pulsar releases a separate binary distribution, containing all the `builtin` connectors.
+> If you would like to enable those `builtin` connectors, you can follow the instructions as below; otherwise you can
+> skip this section for now.
+
+To get started using builtin connectors, you'll need to download the connectors tarball release on every broker node in
+one of the following ways:
+
+* by clicking the link below and downloading the release from an Apache mirror:
+
+  * <a href="pulsar:connector_release_url" download>Pulsar IO Connectors {{pulsar:version}} release</a>
+
+* from the Pulsar [downloads page](pulsar:download_page_url)
+* from the Pulsar [releases page](https://github.com/apache/pulsar/releases/latest)
+* using [wget](https://www.gnu.org/software/wget):
+
+  ```shell
+  $ wget pulsar:connector_release_url
+  ```
+
+Once the tarball is downloaded, in the pulsar directory, untar the io-connectors package and copy the connectors as `connectors`
+in the pulsar directory:
+
+```bash
+$ tar xvfz apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz
+
+// you will find a directory named `apache-pulsar-io-connectors-{{pulsar:version}}` in the pulsar directory
+// then copy the connectors
+
+$ mv apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors
+
+$ ls connectors
+pulsar-io-aerospike-{{pulsar:version}}.nar
+pulsar-io-cassandra-{{pulsar:version}}.nar
+pulsar-io-kafka-{{pulsar:version}}.nar
+pulsar-io-kinesis-{{pulsar:version}}.nar
+pulsar-io-rabbitmq-{{pulsar:version}}.nar
+pulsar-io-twitter-{{pulsar:version}}.nar
+...
+```
+
+## Deploying a ZooKeeper cluster
+
+> If you already have an exsiting zookeeper cluster and would like to use it, you can skip this section.
+
+[ZooKeeper](https://zookeeper.apache.org) manages a variety of essential coordination- and configuration-related tasks for Pulsar. To deploy a Pulsar cluster you'll need to deploy ZooKeeper first (before all other components). We recommend deploying a 3-node ZooKeeper cluster. Pulsar does not make heavy use of ZooKeeper, so more lightweight machines or VMs should suffice for running ZooKeeper.
+
+To begin, add all ZooKeeper servers to the configuration specified in [`conf/zookeeper.conf`](reference-configuration.md#zookeeper) (in the Pulsar directory you created [above](#installing-the-pulsar-binary-package)). Here's an example:
+
+```properties
+server.1=zk1.us-west.example.com:2888:3888
+server.2=zk2.us-west.example.com:2888:3888
+server.3=zk3.us-west.example.com:2888:3888
+```
+
+> If you have only one machine to deploy Pulsar, you just need to add one server entry in the configuration file.
+
+On each host, you need to specify the ID of the node in each node's `myid` file, which is in each server's `data/zookeeper` folder by default (this can be changed via the [`dataDir`](reference-configuration.md#zookeeper-dataDir) parameter).
+
+> See the [Multi-server setup guide](https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_zkMulitServerSetup) in the ZooKeeper documentation for detailed info on `myid` and more.
+
+On a ZooKeeper server at `zk1.us-west.example.com`, for example, you could set the `myid` value like this:
+
+```bash
+$ mkdir -p data/zookeeper
+$ echo 1 > data/zookeeper/myid
+```
+
+On `zk2.us-west.example.com` the command would be `echo 2 > data/zookeeper/myid` and so on.
+
+Once each server has been added to the `zookeeper.conf` configuration and has the appropriate `myid` entry, you can start ZooKeeper on all hosts (in the background, using nohup) with the [`pulsar-daemon`](reference-cli-tools.md#pulsar-daemon) CLI tool:
+
+```bash
+$ bin/pulsar-daemon start zookeeper
+```
+
+> If you are planning to deploy zookeeper with bookie on the same node, you
+> need to start zookeeper by using different stats port.
+
+Start zookeeper with [`pulsar-daemon`](reference-cli-tools.md#pulsar-daemon) CLI tool like:
+
+```bash
+$ PULSAR_EXTRA_OPTS="-Dstats_server_port=8001" bin/pulsar-daemon start zookeeper
+```
+
+## Initializing cluster metadata
+
+Once you've deployed ZooKeeper for your cluster, there is some metadata that needs to be written to ZooKeeper for each cluster in your instance. It only needs to be written **once**.
+
+You can initialize this metadata using the [`initialize-cluster-metadata`](reference-cli-tools.md#pulsar-initialize-cluster-metadata) command of the [`pulsar`](reference-cli-tools.md#pulsar) CLI tool. This command can be run on any machine in your ZooKeeper cluster. Here's an example:
+
+```shell
+$ bin/pulsar initialize-cluster-metadata \
+  --cluster pulsar-cluster-1 \
+  --zookeeper zk1.us-west.example.com:2181 \
+  --configuration-store zk1.us-west.example.com:2181 \
+  --web-service-url http://pulsar.us-west.example.com:8080 \
+  --web-service-url-tls https://pulsar.us-west.example.com:8443 \
+  --broker-service-url pulsar://pulsar.us-west.example.com:6650 \
+  --broker-service-url-tls pulsar+ssl://pulsar.us-west.example.com:6651
+```
+
+As you can see from the example above, the following needs to be specified:
+
+Flag | Description
+:----|:-----------
+`--cluster` | A name for the cluster
+`--zookeeper` | A "local" ZooKeeper connection string for the cluster. This connection string only needs to include *one* machine in the ZooKeeper cluster.
+`--configuration-store` | The configuration store connection string for the entire instance. As with the `--zookeeper` flag, this connection string only needs to include *one* machine in the ZooKeeper cluster.
+`--web-service-url` | The web service URL for the cluster, plus a port. This URL should be a standard DNS name. The default port is 8080 (we don't recommend using a different port).
+`--web-service-url-tls` | If you're using [TLS](security-tls-transport.md), you'll also need to specify a TLS web service URL for the cluster. The default port is 8443 (we don't recommend using a different port).
+`--broker-service-url` | A broker service URL enabling interaction with the brokers in the cluster. This URL should use the same DNS name as the web service URL but should use the `pulsar` scheme instead. The default port is 6650 (we don't recommend using a different port).
+`--broker-service-url-tls` | If you're using [TLS](security-tls-transport.md), you'll also need to specify a TLS web service URL for the cluster as well as a TLS broker service URL for the brokers in the cluster. The default port is 6651 (we don't recommend using a different port).
+
+## Deploying a BookKeeper cluster
+
+[BookKeeper](https://bookkeeper.apache.org) handles all persistent data storage in Pulsar. You will need to deploy a cluster of BookKeeper bookies to use Pulsar. We recommend running a **3-bookie BookKeeper cluster**.
+
+BookKeeper bookies can be configured using the [`conf/bookkeeper.conf`](reference-configuration.md#bookkeeper) configuration file. The most important step in configuring bookies for our purposes here is ensuring that the [`zkServers`](reference-configuration.md#bookkeeper-zkServers) is set to the connection string for the ZooKeeper cluster. Here's an example:
+
+```properties
+zkServers=zk1.us-west.example.com:2181,zk2.us-west.example.com:2181,zk3.us-west.example.com:2181
+```
+
+Once you've appropriately modified the `zkServers` parameter, you can provide any other configuration modifications you need. You can find a full listing of the available BookKeeper configuration parameters [here](reference-configuration.md#bookkeeper), although we would recommend consulting the [BookKeeper documentation](http://bookkeeper.apache.org/docs/latest/reference/config/) for a more in-depth guide.
+
+> ##### NOTES
+>
+> Since Pulsar 2.1.0 release, Pulsar introduces [stateful function](functions-state.md) for Pulsar Functions. If you would like to enable that feature,
+> you need to enable table service on BookKeeper by setting following setting in `conf/bookkeeper.conf` file.
+>
+> ```conf
+> extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
+> ```
+
+Once you've applied the desired configuration in `conf/bookkeeper.conf`, you can start up a bookie on each of your BookKeeper hosts. You can start up each bookie either in the background, using [nohup](https://en.wikipedia.org/wiki/Nohup), or in the foreground.
+
+To start the bookie in the background, use the [`pulsar-daemon`](reference-cli-tools.md#pulsar-daemon) CLI tool:
+
+```bash
+$ bin/pulsar-daemon start bookie
+```
+
+To start the bookie in the foreground:
+
+```bash
+$ bin/bookkeeper bookie
+```
+
+You can verify that a bookie is working properly by running the `bookiesanity` command for the [BookKeeper shell](reference-cli-tools.md#shell) on it:
+
+```bash
+$ bin/bookkeeper shell bookiesanity
+```
+
+This will create an ephemeral BookKeeper ledger on the local bookie, write a few entries, read them back, and finally delete the ledger.
+
+After you have started all the bookies, you can use `simpletest` command for [BookKeeper shell](reference-cli-tools.md#shell) on any bookie node, to
+verify all the bookies in the cluster are up running.
+
+```bash
+$ bin/bookkeeper shell simpletest --ensemble <num-bookies> --writeQuorum <num-bookies> --ackQuorum <num-bookies> --numEntries <num-entries>
+```
+
+This command will create a `num-bookies` sized ledger on the cluster, write a few entries, and finally delete the ledger.
+
+
+## Deploying Pulsar brokers
+
+Pulsar brokers are the last thing you need to deploy in your Pulsar cluster. Brokers handle Pulsar messages and provide Pulsar's administrative interface. We recommend running **3 brokers**, one for each machine that's already running a BookKeeper bookie.
+
+### Configuring Brokers
+
+The most important element of broker configuration is ensuring that that each broker is aware of the ZooKeeper cluster that you've deployed. Make sure that the [`zookeeperServers`](reference-configuration.md#broker-zookeeperServers) and [`configurationStoreServers`](reference-configuration.md#broker-configurationStoreServers) parameters. In this case, since we only have 1 cluster and no configuration store setup, the `configurationStoreServers` will point to the same `zookeeperServers`.
+
+```properties
+zookeeperServers=zk1.us-west.example.com:2181,zk2.us-west.example.com:2181,zk3.us-west.example.com:2181
+configurationStoreServers=zk1.us-west.example.com:2181,zk2.us-west.example.com:2181,zk3.us-west.example.com:2181
+```
+
+You also need to specify the cluster name (matching the name that you provided when [initializing the cluster's metadata](#initializing-cluster-metadata):
+
+```properties
+clusterName=pulsar-cluster-1
+```
+
+> If you deploy Pulsar in a one-node cluster, you should update the replication settings in `conf/broker.conf` to `1`
+>
+> ```properties
+> # Number of bookies to use when creating a ledger
+> managedLedgerDefaultEnsembleSize=1
+>
+> # Number of copies to store for each message
+> managedLedgerDefaultWriteQuorum=1
+> 
+> # Number of guaranteed copies (acks to wait before write is complete)
+> managedLedgerDefaultAckQuorum=1
+> ```
+
+### Enabling Pulsar Functions (optional)
+
+If you want to enable [Pulsar Functions](functions-overview.md), you can follow the instructions as below:
+
+1. Edit `conf/broker.conf` to enable function worker, by setting `functionsWorkerEnabled` to `true`.
+
+    ```conf
+    functionsWorkerEnabled=true
+    ```
+
+2. Edit `conf/functions_worker.yml` and set `pulsarFunctionsCluster` to the cluster name that you provided when [initializing the cluster's metadata](#initializing-cluster-metadata). 
+
+    ```conf
+    pulsarFunctionsCluster: pulsar-cluster-1
+    ```
+
+### Starting Brokers
+
+You can then provide any other configuration changes that you'd like in the [`conf/broker.conf`](reference-configuration.md#broker) file. Once you've decided on a configuration, you can start up the brokers for your Pulsar cluster. Like ZooKeeper and BookKeeper, brokers can be started either in the foreground or in the background, using nohup.
+
+You can start a broker in the foreground using the [`pulsar broker`](reference-cli-tools.md#pulsar-broker) command:
+
+```bash
+$ bin/pulsar broker
+```
+
+You can start a broker in the background using the [`pulsar-daemon`](reference-cli-tools.md#pulsar-daemon) CLI tool:
+
+```bash
+$ bin/pulsar-daemon start broker
+```
+
+Once you've succesfully started up all the brokers you intend to use, your Pulsar cluster should be ready to go!
+
+## Connecting to the running cluster
+
+Once your Pulsar cluster is up and running, you should be able to connect with it using Pulsar clients. One such client is the [`pulsar-client`](reference-cli-tools.md#pulsar-client) tool, which is included with the Pulsar binary package. The `pulsar-client` tool can publish messages to and consume messages from Pulsar topics and thus provides a simple way to make sure that your cluster is runnning properly.
+
+To use the `pulsar-client` tool, first modify the client configuration file in [`conf/client.conf`](reference-configuration.md#client) in your binary package. You'll need to change the values for `webServiceUrl` and `brokerServiceUrl`, substituting `localhost` (which is the default), with the DNS name that you've assigned to your broker/bookie hosts. Here's an example:
+
+```properties
+webServiceUrl=http://us-west.example.com:8080/
+brokerServiceurl=pulsar://us-west.example.com:6650/
+```
+
+Once you've done that, you can publish a message to Pulsar topic:
+
+```bash
+$ bin/pulsar-client produce \
+  persistent://public/default/test \
+  -n 1 \
+  -m "Hello, Pulsar"
+```
+
+> You may need to use a different cluster name in the topic if you specified a cluster name different from `pulsar-cluster-1`.
+
+This will publish a single message to the Pulsar topic.
+
+## Running Functions
+
+> If you have [enabled](#enabling-pulsar-functions-optional) Pulsar Functions, you can also tryout pulsar functions now.
+
+Create a ExclamationFunction `exclamation`.
+
+```bash
+bin/pulsar-admin functions create \
+  --jar examples/api-examples.jar \
+  --classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
+  --inputs persistent://public/default/exclamation-input \
+  --output persistent://public/default/exclamation-output \
+  --tenant public \
+  --namespace default \
+  --name exclamation
+```
+
+Check if the function is running as expected by [triggering](functions-deploying.md#triggering-pulsar-functions) the function.
+
+```bash
+bin/pulsar-admin functions trigger --name exclamation --trigger-value "hello world"
+```
+
+You will see output as below:
+
+```shell
+hello world!
+```
diff --git a/site2/website/versioned_docs/version-2.2.1/functions-overview.md b/site2/website/versioned_docs/version-2.2.1/functions-overview.md
new file mode 100644
index 0000000000..de9e6f168e
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/functions-overview.md
@@ -0,0 +1,452 @@
+---
+id: version-2.2.1-functions-overview
+title: Pulsar Functions overview
+sidebar_label: Overview
+original_id: functions-overview
+---
+
+**Pulsar Functions** are lightweight compute processes that
+
+* consume messages from one or more Pulsar topics,
+* apply a user-supplied processing logic to each message,
+* publish the results of the computation to another topic
+
+Here's an example Pulsar Function for Java (using the [native interface](functions-api.md#java-native-functions)):
+
+```java
+import java.util.Function;
+
+public class ExclamationFunction implements Function<String, String> {
+    @Override
+    public String apply(String input) { return String.format("%s!", input); }
+}
+```
+
+Here's an equivalent function in Python (also using the [native interface](functions-api.md#python-native-functions)):
+
+```python
+def process(input):
+    return "{0}!".format(input)
+```
+
+Functions are executed each time a message is published to the input topic. If a function is listening on the topic `tweet-stream`, for example, then the function would be run each time a message is published to that topic.
+
+## Goals
+
+The core goal behind Pulsar Functions is to enable you to easily create processing logic of any level of complexity without needing to deploy a separate neighboring system (such as [Apache Storm](http://storm.apache.org/), [Apache Heron](https://apache.github.io/incubator-heron), [Apache Flink](https://flink.apache.org/), etc.). Pulsar Functions is essentially ready-made compute infrastructure at your disposal as part of your Pulsar messaging system. This core goal is tied to a series of other goals:
+
+* Developer productivity ([language-native](#language-native-functions) vs. [Pulsar Functions SDK](#the-pulsar-functions-sdk) functions)
+* Easy troubleshooting
+* Operational simplicity (no need for an external processing system)
+
+## Inspirations
+
+The Pulsar Functions feature was inspired by (and takes cues from) several systems and paradigms:
+
+* Stream processing engines such as [Apache Storm](http://storm.apache.org/), [Apache Heron](https://apache.github.io/incubator-heron), and [Apache Flink](https://flink.apache.org)
+* "Serverless" and "Function as a Service" (FaaS) cloud platforms like [Amazon Web Services Lambda](https://aws.amazon.com/lambda/), [Google Cloud Functions](https://cloud.google.com/functions/), and [Azure Cloud Functions](https://azure.microsoft.com/en-us/services/functions/)
+
+Pulsar Functions could be described as
+
+* [Lambda](https://aws.amazon.com/lambda/)-style functions that are
+* specifically designed to use Pulsar as a message bus
+
+## Programming model
+
+The core programming model behind Pulsar Functions is very simple:
+
+* Functions receive messages from one or more **input [topics](reference-terminology.md#topic)**. Every time a message is received, the function can do a variety of things:
+  * Apply some processing logic to the input and write output to:
+    * An **output topic** in Pulsar
+    * [Apache BookKeeper](#state-storage)
+  * Write logs to a **log topic** (potentially for debugging purposes)
+  * Increment a [counter](#word-count-example)
+
+![Pulsar Functions core programming model](assets/pulsar-functions-overview.png)
+
+### Word count example
+
+If you were to implement the classic word count example using Pulsar Functions, it might look something like this:
+
+![Pulsar Functions word count example](assets/pulsar-functions-word-count.png)
+
+If you were writing the function in [Java](functions-api.md#functions-for-java) using the [Pulsar Functions SDK for Java](functions-api.md#java-sdk-functions), you could write the function like this...
+
+```java
+package org.example.functions;
+
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+
+import java.util.Arrays;
+
+public class WordCountFunction implements Function<String, Void> {
+    // This function is invoked every time a message is published to the input topic
+    @Override
+    public Void process(String input, Context context) {
+        Arrays.asList(input.split(" ")).forEach(word -> {
+            String counterKey = word.toLowerCase();
+            context.incrCounter(counterKey, 1)
+        });
+        return null;
+    }
+}
+```
+
+...and then [deploy it](#cluster-run-mode) in your Pulsar cluster using the [command line](#command-line-interface) like this:
+
+```bash
+$ bin/pulsar-admin functions create \
+  --jar target/my-jar-with-dependencies.jar \
+  --classname org.example.functions.WordCountFunction \
+  --tenant public \
+  --namespace default \
+  --name word-count \
+  --inputs persistent://public/default/sentences \
+  --output persistent://public/default/count
+```
+
+### Content-based routing example
+
+The use cases for Pulsar Functions are essentially endless, but let's dig into a more sophisticated example that involves content-based routing.
+
+Imagine a function that takes items (strings) as input and publishes them to either a fruits or vegetables topic, depending on the item. Or, if an item is neither a fruit nor a vegetable, a warning is logged to a [log topic](#logging). Here's a visual representation:
+
+![Pulsar Functions routing example](assets/pulsar-functions-routing-example.png)
+
+If you were implementing this routing functionality in Python, it might look something like this:
+
+```python
+from pulsar import Function
+
+class RoutingFunction(Function):
+    def __init__(self):
+        self.fruits_topic = "persistent://public/default/fruits"
+        self.vegetables_topic = "persistent://public/default/vegetables"
+
+    def is_fruit(item):
+        return item in ["apple", "orange", "pear", "other fruits..."]
+
+    def is_vegetable(item):
+        return item in ["carrot", "lettuce", "radish", "other vegetables..."]
+
+    def process(self, item, context):
+        if self.is_fruit(item):
+            context.publish(self.fruits_topic, item)
+        elif self.is_vegetable(item):
+            context.publish(self.vegetables_topic, item)
+        else:
+            warning = "The item {0} is neither a fruit nor a vegetable".format(item)
+            context.get_logger().warn(warning)
+```
+
+## Command-line interface
+
+Pulsar Functions are managed using the [`pulsar-admin`](reference-pulsar-admin.md) CLI tool (in particular the [`functions`](reference-pulsar-admin.md#functions) command). Here's an example command that would run a function in [local run mode](#local-run-mode):
+
+```bash
+$ bin/pulsar-functions localrun \
+  --inputs persistent://public/default/test_src \
+  --output persistent://public/default/test_result \
+  --jar examples/api-examples.jar \
+  --classname org.apache.pulsar.functions.api.examples.ExclamationFunction
+```
+
+## Fully Qualified Function Name (FQFN)
+
+Each Pulsar Function has a **Fully Qualified Function Name** (FQFN) that consists of three elements: the function's tenant, namespace, and function name. FQFN's look like this:
+
+```http
+tenant/namespace/name
+```
+
+FQFNs enable you to, for example, create multiple functions with the same name provided that they're in different namespaces.
+
+## Configuration
+
+Pulsar Functions can be configured in two ways:
+
+* Via [command-line arguments](#command-line-interface) passed to the [`pulsar-admin functions`](reference-pulsar-admin.md#functions) interface
+* Via [YAML](http://yaml.org/) configuration files
+
+If you're supplying a YAML configuration, you must specify a path to the file on the command line. Here's an example:
+
+```bash
+$ bin/pulsar-admin functions create \
+  --function-config-file ./my-function.yaml
+```
+
+And here's an example `my-function.yaml` file:
+
+```yaml
+name: my-function
+tenant: public
+namespace: default
+jar: ./target/my-functions.jar
+className: org.example.pulsar.functions.MyFunction
+inputs:
+- persistent://public/default/test_src
+output: persistent://public/default/test_result
+```
+
+You can also mix and match configuration methods by specifying some function attributes via the CLI and others via YAML configuration.
+
+## Supported languages
+
+Pulsar Functions can currently be written in [Java](functions-api.md#functions-for-java) and [Python](functions-api.md#functions-for-python). Support for additional languages is coming soon.
+
+## The Pulsar Functions API
+
+The Pulsar Functions API enables you to create processing logic that is:
+
+* Type safe. Pulsar Functions can process raw bytes or more complex, application-specific types.
+* Based on SerDe (**Ser**ialization/**De**serialization). A variety of types are supported "out of the box" but you can also create your own custom SerDe logic.
+
+### Function context
+
+Each Pulsar Function created using the [Pulsar Functions SDK](#the-pulsar-functions-sdk) has access to a context object that both provides:
+
+1. A wide variety of information about the function, including:
+  * The name of the function
+  * The tenant and namespace of the function
+  * [User-supplied configuration](#user-configuration) values
+2. Special functionality, including:
+  * The ability to produce [logs](#logging) to a specified logging topic
+  * The ability to produce [metrics](#metrics)
+
+### Language-native functions
+
+Both Java and Python support writing "native" functions, i.e. Pulsar Functions with no dependencies.
+
+The benefit of native functions is that they don't have any dependencies beyond what's already available in Java/Python "out of the box." The downside is that they don't provide access to the function's [context](#function-context), which is necessary for a variety of functionality, including [logging](#logging), [user configuration](#user-configuration), and more.
+
+## The Pulsar Functions SDK
+
+If you'd like a Pulsar Function to have access to a [context object](#function-context), you can use the **Pulsar Functions SDK**, available for both [Java](functions-api.md#functions-for-java) and [Python](functions-api.md#functions-for-python).
+
+### Java
+
+Here's an example Java function that uses information about its context:
+
+```java
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.slf4j.Logger;
+
+public class ContextAwareFunction implements Function<String, Void> {
+    @Override
+    public Void process(String input, Context, context) {
+        Logger LOG = context.getLogger();
+        String functionTenant = context.getTenant();
+        String functionNamespace = context.getNamespace();
+        String functionName = context.getName();
+        LOG.info("Function tenant/namespace/name: {}/{}/{}", functionTenant, functionNamespace, functionName);
+        return null;
+    }
+}
+```
+
+### Python
+
+Here's an example Python function that uses information about its context:
+
+```python
+from pulsar import Function
+
+class ContextAwareFunction(Function):
+    def process(self, input, context):
+        log = context.get_logger()
+        function_tenant = context.get_function_tenant()
+        function_namespace = context.get_function_namespace()
+        function_name = context.get_function_name()
+        log.info("Function tenant/namespace/name: {0}/{1}/{2}".format(function_tenant, function_namespace, function_name))
+```
+
+## Deployment
+
+The Pulsar Functions feature was built to support a variety of deployment options. At the moment, there are two ways to run Pulsar Functions:
+
+Deployment mode | Description
+:---------------|:-----------
+[Local run mode](#local-run-mode) | The function runs in your local environment, for example on your laptop
+[Cluster mode](#cluster-run-mode) | The function runs *inside of* your Pulsar cluster, on the same machines as your Pulsar [brokers](reference-terminology.md#broker)
+
+### Local run mode
+
+If you run a Pulsar Function in **local run** mode, it will run on the machine from which the command is run (this could be your laptop, an [AWS EC2](https://aws.amazon.com/ec2/) instance, etc.). Here's an example [`localrun`](reference-pulsar-admin.md#localrun) command:
+
+```bash
+$ bin/pulsar-admin functions localrun \
+  --py myfunc.py \
+  --classname myfunc.SomeFunction \
+  --inputs persistent://public/default/input-1 \
+  --output persistent://public/default/output-1
+```
+
+By default, the function will connect to a Pulsar cluster running on the same machine, via a local broker service URL of `pulsar://localhost:6650`. If you'd like to use local run mode to run a function but connect it to a non-local Pulsar cluster, you can specify a different broker URL using the `--brokerServiceUrl` flag. Here's an example:
+
+```bash
+$ bin/pulsar-admin functions localrun \
+  --broker-service-url pulsar://my-cluster-host:6650 \
+  # Other function parameters
+```
+
+### Cluster run mode
+
+When you run a Pulsar Function in **cluster mode**, the function code will be uploaded to a Pulsar broker and run *alongside the broker* rather than in your [local environment](#local-run-mode). You can run a function in cluster mode using the [`create`](reference-pulsar-admin.md#create-1) command. Here's an example:
+
+```bash
+$ bin/pulsar-admin functions create \
+  --py myfunc.py \
+  --classname myfunc.SomeFunction \
+  --inputs persistent://public/default/input-1 \
+  --output persistent://public/default/output-1
+```
+
+This command will upload `myfunc.py` to Pulsar, which will use the code to start one [or more](#parallelism) instances of the function.
+
+### Parallelism
+
+By default, only one **instance** of a Pulsar Function runs when you create and run it in [cluster run mode](#cluster-run-mode). You can also, however, run multiple instances in parallel. You can specify the number of instances when you create the function, or update an existing single-instance function with a new parallelism factor.
+
+This command, for example, would create and run a function with a parallelism of 5 (i.e. 5 instances):
+
+```bash
+$ bin/pulsar-admin functions create \
+  --name parallel-fun \
+  --tenant public \
+  --namespace default \
+  --py func.py \
+  --classname func.ParallelFunction \
+  --parallelism 5
+```
+
+### Function instance resources
+
+When you run Pulsar Functions in [cluster run](#cluster-run-mode) mode, you can specify the resources that are assigned to each function [instance](#parallelism):
+
+Resource | Specified as... | Runtimes
+:--------|:----------------|:--------
+CPU | The number of cores | Docker (coming soon)
+RAM | The number of bytes | Process, Docker
+Disk space | The number of bytes | Docker
+
+Here's an example function creation command that allocates 8 cores, 8 GB of RAM, and 10 GB of disk space to a function:
+
+```bash
+$ bin/pulsar-admin functions create \
+  --jar target/my-functions.jar \
+  --classname org.example.functions.MyFunction \
+  --cpu 8 \
+  --ram 8589934592 \
+  --disk 10737418240
+```
+
+For more information on resources, see the [Deploying and Managing Pulsar Functions](functions-deploying.md#resources) documentation.
+
+### Logging
+
+Pulsar Functions created using the [Pulsar Functions SDK](#the-pulsar-functions-sdk) can send logs to a log topic that you specify as part of the function's configuration. The function created using the command below, for example, would produce all logs on the `persistent://public/default/my-func-1-log` topic:
+
+```bash
+$ bin/pulsar-admin functions create \
+  --name my-func-1 \
+  --log-topic persistent://public/default/my-func-1-log \
+  # Other configs
+```
+
+Here's an example [Java function](functions-api.md#java-logging) that logs at different log levels based on the function's input:
+
+```java
+public class LoggerFunction implements Function<String, Void> {
+    @Override
+    public Void process(String input, Context context) {
+        Logger LOG = context.getLogger();
+        if (input.length() <= 100) {
+            LOG.info("This string has a length of {}", input);
+        } else {
+            LOG.warn("This string is getting too long! It has {} characters", input);
+        }
+    }
+}
+```
+
+### User configuration
+
+Pulsar Functions can be passed arbitrary key-values via the command line (both keys and values must be strings). This set of key-values is called the functions **user configuration**. User configurations must consist of JSON strings.
+
+Here's an example of passing a user configuration to a function:
+
+```bash
+$ bin/pulsar-admin functions create \
+  --user-config '{"key-1":"value-1","key-2","value-2"}' \
+  # Other configs
+```
+
+Here's an example of a function that accesses that config map:
+
+```java
+public class ConfigMapFunction implements Function<String, Void> {
+    @Override
+    public Void process(String input, Context context) {
+        String val1 = context.getUserConfigValue("key1").get();
+        String val2 = context.getUserConfigValue("key2").get();
+        context.getLogger().info("The user-supplied values are {} and {}", val1, val2);
+        return null;
+    }
+}
+```
+
+### Triggering Pulsar Functions
+
+Pulsar Functions running in [cluster mode](#cluster-run-mode) can be [triggered](functions-deploying.md#triggering-pulsar-functions) via the [command line](#command-line-interface). With triggering you can easily pass a specific value to a function and get the function's return value *without* needing to worry about creating a client, sending a message to the right input topic, etc. Triggering can be very useful for---but is by no means limited to---testing and debugging purposes.
+
+> Triggering a function is ultimately no different from invoking a function by producing a message on one of the function's input topics. The [`pulsar-admin functions trigger`](reference-pulsar-admin.md#trigger) command is essentially a convenient mechanism for sending messages to functions without needing to use the [`pulsar-client`](reference-cli-tools.md#pulsar-client) tool or a language-specific client library.
+
+Let's take an example Pulsar Function written in Python (using the [native interface](functions-api.md#python-native-functions)) that simply reverses string inputs:
+
+```python
+def process(input):
+    return input[::-1]
+```
+
+If that function were running in a Pulsar cluster, it could be triggered like this:
+
+```bash
+$ bin/pulsar-admin functions trigger \
+  --tenant public \
+  --namespace default \
+  --name reverse-func \
+  --trigger-value "snoitcnuf raslup ot emoclew"
+```
+
+That should return `welcome to pulsar functions` as the console output.
+
+> Instead of passing in a string via the CLI, you can also trigger a Pulsar Function with the contents of a file using the `--triggerFile` flag.
+
+## Processing guarantees
+
+The Pulsar Functions feature provides three different messaging semantics that you can apply to any function:
+
+Delivery semantics | Description
+:------------------|:-------
+**At-most-once** delivery | Each message that is sent to the function will most likely be processed but also may not be (hence the "at most")
+**At-least-once** delivery | Each message that is sent to the function could be processed more than once (hence the "at least")
+**Effectively-once** delivery | Each message that is sent to the function will have one output associated with it
+
+This command, for example, would run a function in [cluster mode](#cluster-run-mode) with effectively-once guarantees applied:
+
+```bash
+$ bin/pulsar-admin functions create \
+  --name my-effectively-once-function \
+  --processing-guarantees EFFECTIVELY_ONCE \
+  # Other function configs
+```
+
+## Metrics
+
+Pulsar Functions that use the [Pulsar Functions SDK](#the-pulsar-functions-sdk) can publish metrics to Pulsar. For more information, see [Metrics for Pulsar Functions](functions-metrics.md).
+
+## State storage
+
+Pulsar Functions use [Apache BookKeeper](https://bookkeeper.apache.org) as a state storage interface. All Pulsar installations, including local standalone installations, include a deployment of BookKeeper bookies.
diff --git a/site2/website/versioned_docs/version-2.2.1/getting-started-docker.md b/site2/website/versioned_docs/version-2.2.1/getting-started-docker.md
new file mode 100644
index 0000000000..d1123a407a
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/getting-started-docker.md
@@ -0,0 +1,168 @@
+---
+id: version-2.2.1-standalone-docker
+title: Start a standalone cluster with Docker
+sidebar_label: Pulsar in Docker
+original_id: standalone-docker
+---
+
+For the purposes of local development and testing, you can run Pulsar in standalone
+mode on your own machine within a Docker container.
+
+If you don't have Docker installed, you can download the [Community edition](https://www.docker.com/community-edition)
+and follow the instructions for your OS.
+
+## Starting Pulsar inside Docker
+
+```shell
+$ docker run -it \
+  -p 6650:6650 \
+  -p 8080:8080 \
+  -v $PWD/data:/pulsar/data \
+  apachepulsar/pulsar:{{site.current_version}} \
+  bin/pulsar standalone
+```
+
+Under Windows, you should use something like the following docker command:
+
+```shell
+$ docker run -it \
+  -p 6650:6650 \
+  -p 8080:8080 \
+  -v "$PWD/data:/pulsar/data".ToLower() \
+  apachepulsar/pulsar:{{site.current_version}} \
+  bin/pulsar standalone
+```
+
+A few things to note about this command:
+ * `$PWD/data` : The docker host directory under the Windows operating system must be lowercase.`$PWD/data` can provide you with the specified directory, for example: `E:/data`.
+ * `-v $PWD/data:/pulsar/data`: This will make the process inside the container to store the
+   data and metadata in the filesystem outside the container, in order to not start "fresh" every
+   time the container is restarted.
+
+If Pulsar has been successfully started, you should see `INFO`-level log messages like this:
+
+```
+2017-08-09 22:34:04,030 - INFO  - [main:WebService@213] - Web Service started at http://127.0.0.1:8080
+2017-08-09 22:34:04,038 - INFO  - [main:PulsarService@335] - messaging service is ready, bootstrap service on port=8080, broker url=pulsar://127.0.0.1:6650, cluster=standalone, configs=org.apache.pulsar.broker.ServiceConfiguration@4db60246
+...
+```
+
+
+> #### Automatically created namespace
+> When you start a local standalone cluster, Pulsar will automatically create a `public/default`
+namespace that you can use for development purposes. All Pulsar topics are managed within namespaces.
+For more info, see [Topics](concepts-messaging.md#topics).
+
+
+## Start publishing and consuming messages
+
+Pulsar currently offers client libraries for [Java](client-libraries-java.md), [Go](client-libraries-go.md), [Python](client-libraries-python.md) 
+and [C++](client-libraries-cpp.md). If you're running a local standalone cluster, you can
+use one of these root URLs for interacting with your cluster:
+
+* `pulsar://localhost:6650`
+* `http://localhost:8080`
+
+Here's an example that lets you quickly get started with Pulsar by using the [Python](client-libraries-python.md)
+client API.
+
+You can install the Pulsar Python client library directly from [PyPI](https://pypi.org/project/pulsar-client/):
+
+```shell
+$ pip install pulsar-client
+```
+
+First create a consumer and subscribe to the topic:
+
+```python
+import pulsar
+
+client = pulsar.Client('pulsar://localhost:6650')
+consumer = client.subscribe('my-topic',
+                            subscription_name='my-sub')
+
+while True:
+    msg = consumer.receive()
+    print("Received message: '%s'" % msg.data())
+    consumer.acknowledge(msg)
+
+client.close()
+```
+
+Now we can start a producer to send some test messages:
+
+```python
+import pulsar
+
+client = pulsar.Client('pulsar://localhost:6650')
+producer = client.create_producer('my-topic')
+
+for i in range(10):
+    producer.send(('hello-pulsar-%d' % i).encode('utf-8'))
+
+client.close()
+```
+
+
+## Get the topic statistics
+
+In Pulsar you can use REST, Java, or command-line tools to control every aspect of the system.
+You can find detailed documentation of all the APIs in the [Admin API Overview](admin-api-overview.md).
+
+In the simplest example, you can use curl to probe the stats for a particular topic:
+
+```shell
+$ curl http://localhost:8080/admin/v2/persistent/public/default/my-topic/stats | python -m json.tool
+```
+
+The output will be something like this:
+
+```json
+{
+  "averageMsgSize": 0.0,
+  "msgRateIn": 0.0,
+  "msgRateOut": 0.0,
+  "msgThroughputIn": 0.0,
+  "msgThroughputOut": 0.0,
+  "publishers": [
+    {
+      "address": "/172.17.0.1:35048",
+      "averageMsgSize": 0.0,
+      "clientVersion": "1.19.0-incubating",
+      "connectedSince": "2017-08-09 20:59:34.621+0000",
+      "msgRateIn": 0.0,
+      "msgThroughputIn": 0.0,
+      "producerId": 0,
+      "producerName": "standalone-0-1"
+    }
+  ],
+  "replication": {},
+  "storageSize": 16,
+  "subscriptions": {
+    "my-sub": {
+      "blockedSubscriptionOnUnackedMsgs": false,
+      "consumers": [
+        {
+          "address": "/172.17.0.1:35064",
+          "availablePermits": 996,
+          "blockedConsumerOnUnackedMsgs": false,
+          "clientVersion": "1.19.0-incubating",
+          "connectedSince": "2017-08-09 21:05:39.222+0000",
+          "consumerName": "166111",
+          "msgRateOut": 0.0,
+          "msgRateRedeliver": 0.0,
+          "msgThroughputOut": 0.0,
+          "unackedMessages": 0
+        }
+      ],
+      "msgBacklog": 0,
+      "msgRateExpired": 0.0,
+      "msgRateOut": 0.0,
+      "msgRateRedeliver": 0.0,
+      "msgThroughputOut": 0.0,
+      "type": "Exclusive",
+      "unackedMessages": 0
+    }
+  }
+}
+```
diff --git a/site2/website/versioned_docs/version-2.2.1/getting-started-standalone.md b/site2/website/versioned_docs/version-2.2.1/getting-started-standalone.md
new file mode 100644
index 0000000000..48e8b7a1d9
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/getting-started-standalone.md
@@ -0,0 +1,198 @@
+---
+id: version-2.2.1-standalone
+title: Setting up a local standalone cluster
+sidebar_label: Run Pulsar locally
+original_id: standalone
+---
+
+For the purposes of local development and testing, you can run Pulsar in standalone mode on your own machine. Standalone mode includes a Pulsar broker as well as the necessary ZooKeeper and BookKeeper components running inside of a single Java Virtual Machine (JVM) process.
+
+> #### Pulsar in production? 
+> If you're looking to run a full production Pulsar installation, see the [Deploying a Pulsar instance](deploy-bare-metal.md) guide.
+
+## Run Pulsar Standalone Manually
+
+### System requirements
+
+Pulsar is currently available for **MacOS** and **Linux**. In order to use Pulsar, you'll need to install [Java 8](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html).
+
+
+### Installing Pulsar
+
+To get started running Pulsar, download a binary tarball release in one of the following ways:
+
+* by clicking the link below and downloading the release from an Apache mirror:
+
+  * <a href="pulsar:binary_release_url" download>Pulsar {{pulsar:version}} binary release</a>
+
+* from the Pulsar [downloads page](pulsar:download_page_url)
+* from the Pulsar [releases page](https://github.com/apache/pulsar/releases/latest)
+* using [wget](https://www.gnu.org/software/wget):
+
+  ```shell
+  $ wget pulsar:binary_release_url
+  ```
+
+Once the tarball is downloaded, untar it and `cd` into the resulting directory:
+
+```bash
+$ tar xvfz apache-pulsar-{{pulsar:version}}-bin.tar.gz
+$ cd apache-pulsar-{{pulsar:version}}
+```
+
+### What your package contains
+
+The Pulsar binary package initially contains the following directories:
+
+Directory | Contains
+:---------|:--------
+`bin` | Pulsar's command-line tools, such as [`pulsar`](reference-cli-tools.md#pulsar) and [`pulsar-admin`](reference-pulsar-admin.md)
+`conf` | Configuration files for Pulsar, including for [broker configuration](reference-configuration.md#broker), [ZooKeeper configuration](reference-configuration.md#zookeeper), and more
+`examples` | A Java JAR file containing example [Pulsar Functions](functions-overview.md)
+`lib` | The [JAR](https://en.wikipedia.org/wiki/JAR_(file_format)) files used by Pulsar
+`licenses` | License files, in `.txt` form, for various components of the Pulsar [codebase](developing-codebase.md)
+
+These directories will be created once you begin running Pulsar:
+
+Directory | Contains
+:---------|:--------
+`data` | The data storage directory used by ZooKeeper and BookKeeper
+`instances` | Artifacts created for [Pulsar Functions](functions-overview.md)
+`logs` | Logs created by the installation
+
+
+### Installing Builtin Connectors
+
+Since release `2.1.0-incubating`, Pulsar releases a separate binary distribution, containing all the `builtin` connectors.
+If you would like to enable those `builtin` connectors, you can download the connectors tarball release in one of the following ways:
+
+* by clicking the link below and downloading the release from an Apache mirror:
+
+  * <a href="pulsar:connector_release_url" download>Pulsar IO Connectors {{pulsar:version}} release</a>
+
+* from the Pulsar [downloads page](pulsar:download_page_url)
+* from the Pulsar [releases page](https://github.com/apache/pulsar/releases/latest)
+* using [wget](https://www.gnu.org/software/wget):
+
+  ```shell
+  $ wget pulsar:connector_release_url
+  ```
+
+Once the tarball is downloaded, in the pulsar directory, untar the io-connectors package and copy the connectors as `connectors`
+in the pulsar directory:
+
+```bash
+$ tar xvfz /path/to/apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz
+
+// you will find a directory named `apache-pulsar-io-connectors-{{pulsar:version}}` in the pulsar directory
+// then copy the connectors
+
+$ cd apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors
+
+$ ls connectors
+pulsar-io-aerospike-{{pulsar:version}}.nar
+pulsar-io-cassandra-{{pulsar:version}}.nar
+pulsar-io-kafka-{{pulsar:version}}.nar
+pulsar-io-kinesis-{{pulsar:version}}.nar
+pulsar-io-rabbitmq-{{pulsar:version}}.nar
+pulsar-io-twitter-{{pulsar:version}}.nar
+...
+```
+
+> #### NOTES
+>
+> If you are running Pulsar in a bare metal cluster, you need to make sure `connectors` tarball is unzipped in every broker's pulsar directory
+> (or in every function-worker's pulsar directory if you are running a separate worker cluster for Pulsar functions).
+> 
+> If you are [running Pulsar in Docker](getting-started-docker.md) or deploying Pulsar using a docker image (e.g. [K8S](deploy-kubernetes.md) or [DCOS](deploy-dcos.md)),
+> you can use `apachepulsar/pulsar-all` image instead of `apachepulsar/pulsar` image. `apachepulsar/pulsar-all` image has already bundled [all builtin connectors](io-overview.md#working-with-connectors).
+
+### Starting the cluster
+
+Once you have an up-to-date local copy of the release, you can start up a local cluster using the [`pulsar`](reference-cli-tools.md#pulsar) command, which is stored in the `bin` directory, and specifying that you want to start up Pulsar in standalone mode:
+
+```bash
+$ bin/pulsar standalone
+```
+
+If Pulsar has been successfully started, you should see `INFO`-level log messages like this:
+
+```bash
+2017-06-01 14:46:29,192 - INFO  - [main:WebSocketService@95] - Global Zookeeper cache started
+2017-06-01 14:46:29,192 - INFO  - [main:AuthenticationService@61] - Authentication is disabled
+2017-06-01 14:46:29,192 - INFO  - [main:WebSocketService@108] - Pulsar WebSocket Service started
+```
+
+> #### Automatically created namespace
+> When you start a local standalone cluster, Pulsar will automatically create a `public/default` [namespace](concepts-messaging.md#namespaces) that you can use for development purposes. All Pulsar topics are managed within namespaces. For more info, see [Topics](concepts-messaging.md#topics).
+
+## Run Pulsar Standalone in Docker
+
+Alternatively, you can run pulsar standalone locally in docker.
+
+```bash
+docker run -it -p 80:80 -p 8080:8080 -p 6650:6650 apachepulsar/pulsar-standalone
+```
+
+The command forwards following port to localhost:
+
+- 80: the port for pulsar dashboard
+- 8080: the http service url for pulsar service
+- 6650: the binary protocol service url for pulsar service
+
+After the docker container is running, you can access the dashboard under http://localhost .
+
+## Testing your cluster setup
+
+Pulsar provides a CLI tool called [`pulsar-client`](reference-cli-tools.md#pulsar-client) that enables you to do things like send messages to a Pulsar topic in a running cluster. This command will send a simple message saying `hello-pulsar` to the `my-topic` topic:
+
+```bash
+$ bin/pulsar-client produce my-topic --messages "hello-pulsar"
+```
+
+If the message has been successfully published to the topic, you should see a confirmation like this in the `pulsar-client` logs:
+
+```
+13:09:39.356 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully produced
+```
+
+
+> #### No need to explicitly create new topics
+> You may have noticed that we did not explicitly create the `my-topic` topic to which we sent the `hello-pulsar` message. If you attempt to write a message to a topic that does not yet exist, Pulsar will automatically create that topic for you.
+
+## Using Pulsar clients locally
+
+Pulsar currently offers client libraries for [Java](client-libraries-java.md),  [Go](client-libraries-go.md), [Python](client-libraries-python.md) and [C++](client-libraries-cpp.md). If you're running a local standalone cluster, you can use one of these root URLs for interacting with your cluster:
+
+* `http://localhost:8080`
+* `pulsar://localhost:6650`
+
+Here's an example producer for a Pulsar topic using the [Java](client-libraries-java.md) client:
+
+```java
+String localClusterUrl = "pulsar://localhost:6650";
+
+PulsarClient client = PulsarClient.builder().serviceURL(localClusterUrl).build();
+Producer<byte[]> producer = client.newProducer().topic("my-topic").create();
+```
+
+Here's an example [Python](client-libraries-python.md) producer:
+
+```python
+import pulsar
+
+client = pulsar.Client('pulsar://localhost:6650')
+producer = client.create_producer('my-topic')
+```
+
+Finally, here's an example [C++](client-libraries-cpp.md) producer:
+
+```cpp
+Client client("pulsar://localhost:6650");
+Producer producer;
+Result result = client.createProducer("my-topic", producer);
+if (result != ResultOk) {
+    LOG_ERROR("Error creating producer: " << result);
+    return -1;
+}
+```
diff --git a/site2/website/versioned_docs/version-2.2.1/io-cdc.md b/site2/website/versioned_docs/version-2.2.1/io-cdc.md
new file mode 100644
index 0000000000..b1c96be5ed
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/io-cdc.md
@@ -0,0 +1,147 @@
+---
+id: version-2.2.1-io-cdc
+title: CDC Connector
+sidebar_label: CDC Connector
+original_id: io-cdc
+---
+
+## Source
+
+The CDC Source connector is used to capture change log of existing databases like MySQL, MongoDB, PostgreSQL into Pulsar.
+
+The CDC Source connector is built on top of [Debezium](https://debezium.io/). This connector stores all data into Pulsar Cluster in a persistent, replicated and partitioned way.
+This CDC Source are tested by using MySQL, and you could get more information regarding how it works at [this link](https://debezium.io/docs/connectors/mysql/).
+Regarding how Debezium works, please reference to [Debezium tutorial](https://debezium.io/docs/tutorial/). It is recommended that you go through this tutorial first.
+
+### Source Configuration Options
+
+The Configuration is mostly related to Debezium task config, besides this we should provides the service URL of Pulsar cluster, and topic names that used to store offset and history.
+
+| Name | Required | Default | Description |
+|------|----------|---------|-------------|
+| `task.class` | `true` | `null` | A source task class that implemented in Debezium. |
+| `database.hostname` | `true` | `null` | The address of the Database server. |
+| `database.port` | `true` | `null` | The port number of the Database server.. |
+| `database.user` | `true` | `null` | The name of the Database user that has the required privileges. |
+| `database.password` | `true` | `null` | The password for the Database user that has the required privileges. |
+| `database.server.id` | `true` | `null` | The connector’s identifier that must be unique within the Database cluster and similar to Database’s server-id configuration property. |
+| `database.server.name` | `true` | `null` | The logical name of the Database server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used. |
+| `database.whitelist` | `false` | `null` | A list of all databases hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the databases and tables to include or exclude from monitoring. |
+| `key.converter` | `true` | `null` | The converter provided by Kafka Connect to convert record key. |
+| `value.converter` | `true` | `null` | The converter provided by Kafka Connect to convert record value.  |
+| `database.history` | `true` | `null` | The name of the database history class name. |
+| `database.history.pulsar.topic` | `true` | `null` | The name of the database history topic where the connector will write and recover DDL statements. This topic is for internal use only and should not be used by consumers. |
+| `database.history.pulsar.service.url` | `true` | `null` | Pulsar cluster service url for history topic. |
+| `pulsar.service.url` | `true` | `null` | Pulsar cluster service url. |
+| `offset.storage.topic` | `true` | `null` | Record the last committed offsets that the connector successfully completed. |
+
+### Configuration Example
+
+Here is a configuration Json example:
+
+```$json
+{
+    "tenant": "public",
+    "namespace": "default",
+    "name": "debezium-kafka-source",
+    "className": "org.apache.pulsar.io.kafka.connect.KafkaConnectSource" ,
+    "topicName": "kafka-connect-topic",
+    "configs":
+    {
+        "task.class": "io.debezium.connector.mysql.MySqlConnectorTask",
+        "database.hostname": "localhost",
+        "database.port": "3306",
+        "database.user": "debezium",
+        "database.password": "dbz",
+        "database.server.id": "184054",
+        "database.server.name": "dbserver1",
+        "database.whitelist": "inventory",
+        "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
+        "database.history.pulsar.topic": "history-topic",
+        "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650",
+        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
+        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
+        "pulsar.service.url": "pulsar://127.0.0.1:6650",
+        "offset.storage.topic": "offset-topic"
+    },
+    "archive": "connectors/pulsar-io-kafka-connect-adaptor-2.3.0-SNAPSHOT.nar"
+}
+```
+
+You could also find the yaml example in this [file](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml), which has similar content below:
+
+```$yaml
+tenant: "public"
+namespace: "default"
+name: "debezium-kafka-source"
+topicName: "kafka-connect-topic"
+archive: "connectors/pulsar-io-kafka-connect-adaptor-2.3.0-SNAPSHOT.nar"
+
+##autoAck: true
+parallelism: 1
+
+configs:
+  ## sourceTask
+  task.class: "io.debezium.connector.mysql.MySqlConnectorTask"
+
+  ## config for mysql, docker image: debezium/example-mysql:0.8
+  database.hostname: "localhost"
+  database.port: "3306"
+  database.user: "debezium"
+  database.password: "dbz"
+  database.server.id: "184054"
+  database.server.name: "dbserver1"
+  database.whitelist: "inventory"
+
+  database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
+  database.history.pulsar.topic: "history-topic"
+  database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
+  ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG
+  key.converter: "org.apache.kafka.connect.json.JsonConverter"
+  value.converter: "org.apache.kafka.connect.json.JsonConverter"
+  ## PULSAR_SERVICE_URL_CONFIG
+  pulsar.service.url: "pulsar://127.0.0.1:6650"
+  ## OFFSET_STORAGE_TOPIC_CONFIG
+  offset.storage.topic: "offset-topic"
+```
+
+### Usage example
+
+Here is a simple example to store MySQL change data using above example config.
+
+- Start a MySQL server with an example database, from which Debezium can capture changes.
+```$bash
+ docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.8
+```
+
+- Start a Pulsar service locally in standalone mode.
+```$bash
+ bin/pulsar standalone
+```
+
+- Start pulsar debezium connector, with local run mode, and using above yaml config file. Please make sure that the nar file is available as configured in path `connectors/pulsar-io-kafka-connect-adaptor-2.3.0-SNAPSHOT.nar`.
+```$bash
+ bin/pulsar-admin source localrun  --sourceConfigFile debezium-mysql-source-config.yaml
+```
+
+- Subscribe the topic for table `inventory.products`.
+```
+ bin/pulsar-client consume -s "sub-products" public/default/dbserver1.inventory.products -n 0
+```
+
+- start a MySQL cli docker connector, and use it we could change to the table `products` in MySQL server.
+```$bash
+$docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
+```
+
+This command will pop out MySQL cli, in this cli, we could do a change in table products, use commands below to change the name of 2 items in table products:
+
+```
+mysql> use inventory;
+mysql> show tables;
+mysql> SELECT * FROM  products ;
+mysql> UPDATE products SET name='1111111111' WHERE id=101;
+mysql> UPDATE products SET name='1111111111' WHERE id=107;
+```
+
+- In above subscribe topic terminal tab, we could find that 2 changes has been kept into products topic.
diff --git a/site2/website/versioned_docs/version-2.2.1/io-connectors.md b/site2/website/versioned_docs/version-2.2.1/io-connectors.md
new file mode 100644
index 0000000000..d48c1dfb37
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/io-connectors.md
@@ -0,0 +1,21 @@
+---
+id: version-2.2.1-io-connectors
+title: Builtin Connectors
+sidebar_label: Builtin Connectors
+original_id: io-connectors
+---
+
+Pulsar distribution includes a set of common connectors that have been packaged and tested with the rest of Apache Pulsar.
+These connectors import and export data from some of the most commonly used data systems. Using any these connectors is
+as easy as writing a simple connector configuration and running the connector locally or submitting the connector to a
+Pulsar Functions cluster.
+
+- [Aerospike Sink Connector](io-aerospike.md)
+- [Cassandra Sink Connector](io-cassandra.md)
+- [Kafka Sink Connector](io-kafka.md#sink)
+- [Kafka Source Connector](io-kafka.md#source)
+- [Kinesis Sink Connector](io-kinesis.md#sink)
+- [RabbitMQ Source Connector](io-rabbitmq.md#source)
+- [Twitter Firehose Source Connector](io-twitter.md)
+- [CDC Source Connector based on Debezium](io-cdc.md)
+- [Netty Tcp Source Connector](io-tcp.md#source)
diff --git a/site2/website/versioned_docs/version-2.2.1/io-overview.md b/site2/website/versioned_docs/version-2.2.1/io-overview.md
new file mode 100644
index 0000000000..835f269f3c
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/io-overview.md
@@ -0,0 +1,41 @@
+---
+id: version-2.2.1-io-overview
+title: Pulsar IO Overview
+sidebar_label: Overview
+original_id: io-overview
+---
+
+Messaging systems are most powerful when you can easily use them in conjunction with external systems like databases and other messaging systems. **Pulsar IO** is a feature of Pulsar that enables you to easily create, deploy, and manage Pulsar **connectors** that interact with external systems, such as [Apache Cassandra](https://cassandra.apache.org), [Aerospike](https://www.aerospike.com), and many others.
+
+> #### Pulsar IO and Pulsar Functions
+> Under the hood, Pulsar IO connectors are specialized [Pulsar Functions](functions-overview.md) purpose-built to interface with external systems. The [administrative interface](io-quickstart.md) for Pulsar IO is, in fact, quite similar to that of Pulsar Functions.
+
+## Sources and sinks
+
+Pulsar IO connectors come in two types:
+
+* **Sources** feed data *into* Pulsar from other systems. Common sources include other messaging systems and "firehose"-style data pipeline APIs.
+* **Sinks** are fed data *from* Pulsar. Common sinks include other messaging systems and SQL and NoSQL databases.
+
+This diagram illustrates the relationship between sources, sinks, and Pulsar:
+
+![Pulsar IO diagram](assets/pulsar-io.png "Pulsar IO connectors (sources and sinks)")
+
+## Working with connectors
+
+Pulsar IO connectors can be managed via the [`pulsar-admin`](reference-pulsar-admin.md) CLI tool, in particular the [`source`](reference-pulsar-admin.md#source) and [`sink`](reference-pulsar-admin.md#sink) commands.
+
+> For a guide to managing connectors in your Pulsar installation, see the [Getting started with Pulsar IO](io-quickstart.md)
+
+The following connectors are currently available for Pulsar:
+
+|Name|Java Class|Documentation|
+|---|---|---|
+|[Aerospike sink](https://www.aerospike.com/)|[`org.apache.pulsar.io.aerospike.AerospikeSink`](https://github.com/apache/pulsar/blob/master/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java)|[Documentation](io-aerospike.md)|
+|[Cassandra sink](https://cassandra.apache.org)|[`org.apache.pulsar.io.cassandra.CassandraSink`](https://github.com/apache/pulsar/blob/master/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java)|[Documentation](io-cassandra.md)|
+|[Kafka source](https://kafka.apache.org)|[`org.apache.pulsar.io.kafka.KafkaSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java)|[Documentation](io-kafka.md#source)|
+|[Kafka sink](https://kafka.apache.org)|[`org.apache.pulsar.io.kafka.KafkaSink`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java)|[Documentation](io-kafka.md#sink)|
+|[Kinesis sink](https://aws.amazon.com/kinesis/)|[`org.apache.pulsar.io.kinesis.KinesisSink`](https://github.com/apache/pulsar/blob/master/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java)|[Documentation](io-kinesis.md#sink)|
+|[RabbitMQ source](https://www.rabbitmq.com)|[`org.apache.pulsar.io.rabbitmq.RabbitMQSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java)|[Documentation](io-rabbitmq.md#sink)|
+|[Twitter Firehose source](https://developer.twitter.com/en/docs)|[`org.apache.pulsar.io.twitter.TwitterFireHose`](https://github.com/apache/pulsar/blob/master/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java)|[Documentation](io-twitter.md#source)|
+|[CDC Connector](https://debezium.io/)|[`org.apache.pulsar.io.kafka.connect.KafkaConnectSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java)|[Documentation](io-cdc.md)|
diff --git a/site2/website/versioned_docs/version-2.2.1/io-rabbitmq.md b/site2/website/versioned_docs/version-2.2.1/io-rabbitmq.md
new file mode 100644
index 0000000000..dd2d8676c1
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/io-rabbitmq.md
@@ -0,0 +1,20 @@
+---
+id: version-2.2.1-io-rabbitmq
+title: RabbitMQ Connector
+sidebar_label: RabbitMQ Connector
+original_id: io-rabbitmq
+---
+
+## Source
+
+The RabbitMQ Source connector is used for receiving messages from a RabbitMQ cluster and writing
+messages to Pulsar topics.
+
+### Source Configuration Options
+
+| Name | Required | Default | Description |
+|------|----------|---------|-------------|
+| `connectionName` | `true` | `null` | A new broker connection name. |
+| `amqUri` | `true` | `null` | An AMQP URI: host, port, username, password and virtual host. |
+| `queueName` | `true` | `null` | RabbitMQ queue name. |
+
diff --git a/site2/website/versioned_docs/version-2.2.1/io-tcp.md b/site2/website/versioned_docs/version-2.2.1/io-tcp.md
new file mode 100644
index 0000000000..ec1c98f831
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/io-tcp.md
@@ -0,0 +1,20 @@
+---
+id: version-2.2.1-io-tcp
+title: Netty Tcp Connector
+sidebar_label: Netty Tcp Connector
+original_id: io-tcp
+---
+
+## Source
+
+The Netty Tcp Source connector is used to listen Tcp messages from Tcp Client and write them to user-defined Pulsar topic.
+Also, this connector is suggested to be used in a containerized (e.g. k8s) deployment.
+Otherwise, if the connector is running in process or thread mode, the instances may be conflicting on listening to ports.
+
+### Source Configuration Options
+
+| Name | Required | Default | Description |
+|------|----------|---------|-------------|
+| `host` | `false` | `127.0.0.1` | The host name or address that the source instance to listen on. |
+| `port` | `false` | `10999` | The port that the source instance to listen on. |
+| `numberOfThreads` | `false` | `1` | The number of threads of Netty Tcp Server to accept incoming connections and handle the traffic of the accepted connections. |
diff --git a/site2/website/versioned_docs/version-2.2.1/reference-configuration.md b/site2/website/versioned_docs/version-2.2.1/reference-configuration.md
new file mode 100644
index 0000000000..a5b7f0ca85
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/reference-configuration.md
@@ -0,0 +1,479 @@
+---
+id: version-2.2.1-reference-configuration
+title: Pulsar configuration
+sidebar_label: Pulsar configuration
+original_id: reference-configuration
+---
+
+<style type="text/css">
+  table{
+    font-size: 80%;
+  }
+</style>
+
+
+Pulsar configuration can be managed either via a series of configuration files contained in the [`conf`](https://github.com/apache/pulsar/tree/master/conf) directory of a Pulsar [installation](getting-started-standalone.md)
+
+* [BookKeeper](#bookkeeper)
+* [Broker](#broker)
+* [Client](#client)
+* [Service discovery](#service-discovery)
+* [Log4j](#log4j)
+* [Log4j shell](#log4j-shell)
+* [Standalone](#standalone)
+* [WebSocket](#websocket)
+* [ZooKeeper](#zookeeper)
+
+## BookKeeper
+
+BookKeeper is a replicated log storage system that Pulsar uses for durable storage of all messages.
+
+
+|Name|Description|Default|
+|---|---|---|
+|bookiePort|The port on which the bookie server listens.|3181|
+|allowLoopback|Whether the bookie is allowed to use a loopback interface as its primary interface (i.e. the interface used to establish its identity). By default, loopback interfaces are not allowed as the primary interface. Using a loopback interface as the primary interface usually indicates a configuration error. For example, it’s fairly common in some VPS setups to not configure a hostname or to have the hostname resolve to `127.0.0.1`. If this is the case, then all bookies in the cluster will establish their identities as `127.0.0.1:3181` and only one will be able to join the cluster. For VPSs configured like this, you should explicitly set the listening interface.|false|
+|listeningInterface|The network interface on which the bookie listens. If not set, the bookie will listen on all interfaces.|eth0|
+|journalDirectory|The directory where Bookkeeper outputs its write-ahead log (WAL)|data/bookkeeper/journal|
+|ledgerDirectories|The directory where Bookkeeper outputs ledger snapshots. This could define multiple directories to store snapshots separated by comma, for example `ledgerDirectories=/tmp/bk1-data,/tmp/bk2-data`. Ideally, ledger dirs and the journal dir are each in a different device, which reduces the contention between random I/O and sequential write. It is possible to run with a single disk, but performance will be significantly lower.|data/bookkeeper/ledgers|
+|ledgerManagerType|The type of ledger manager used to manage how ledgers are stored, managed, and garbage collected. See [BookKeeper Internals](http://bookkeeper.apache.org/docs/latest/getting-started/concepts) for more info.|hierarchical|
+|zkLedgersRootPath|The root ZooKeeper path used to store ledger metadata. This parameter is used by the ZooKeeper-based ledger manager as a root znode to store all ledgers.|/ledgers|
+|ledgerStorageClass|Ledger storage implementation class|org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage|
+|entryLogFilePreallocationEnabled|Enable or disable entry logger preallocation|true|
+|logSizeLimit|Max file size of the entry logger, in bytes. A new entry log file will be created when the old one reaches the file size limitation.|2147483648|
+|minorCompactionThreshold|Threshold of minor compaction. Entry log files whose remaining size percentage reaches below this threshold will be compacted in a minor compaction. If set to less than zero, the minor compaction is disabled.|0.2|
+|minorCompactionInterval|Time interval to run minor compaction, in seconds. If set to less than zero, the minor compaction is disabled.|3600|
+|majorCompactionThreshold|The threshold of major compaction. Entry log files whose remaining size percentage reaches below this threshold will be compacted in a major compaction. Those entry log files whose remaining size percentage is still higher than the threshold will never be compacted. If set to less than zero, the minor compaction is disabled.|0.5|
+|majorCompactionInterval|The time interval to run major compaction, in seconds. If set to less than zero, the major compaction is disabled.|86400|
+|compactionMaxOutstandingRequests|Sets the maximum number of entries that can be compacted without flushing. When compacting, the entries are written to the entrylog and the new offsets are cached in memory. Once the entrylog is flushed the index is updated with the new offsets. This parameter controls the number of entries added to the entrylog before a flush is forced. A higher value for this parameter means more memory will be used for offsets. Each offset consists of 3 longs. This parameter should not be modified unless you’re fully aware of the consequences.|100000|
+|compactionRate|The rate at which compaction will read entries, in adds per second.|1000|
+|isThrottleByBytes|Throttle compaction by bytes or by entries.|false|
+|compactionRateByEntries|The rate at which compaction will read entries, in adds per second.|1000|
+|compactionRateByBytes|Set the rate at which compaction will readd entries. The unit is bytes added per second.|1000000|
+|journalMaxSizeMB|Max file size of journal file, in megabytes. A new journal file will be created when the old one reaches the file size limitation.|2048|
+|journalMaxBackups|The max number of old journal filse to keep. Keeping a number of old journal files would help data recovery in special cases.|5|
+|journalPreAllocSizeMB|How space to pre-allocate at a time in the journal.|16|
+|journalWriteBufferSizeKB|The of the write buffers used for the journal.|64|
+|journalRemoveFromPageCache|Whether pages should be removed from the page cache after force write.|true|
+|journalAdaptiveGroupWrites|Whether to group journal force writes, which optimizes group commit for higher throughput.|true|
+|journalMaxGroupWaitMSec|The maximum latency to impose on a journal write to achieve grouping.|1|
+|journalAlignmentSize|All the journal writes and commits should be aligned to given size|4096|
+|journalBufferedWritesThreshold|Maximum writes to buffer to achieve grouping|524288|
+|journalFlushWhenQueueEmpty|If we should flush the journal when journal queue is empty|false|
+|numJournalCallbackThreads|The number of threads that should handle journal callbacks|8|
+|rereplicationEntryBatchSize|The number of max entries to keep in fragment for re-replication|5000|
+|gcWaitTime|How long the interval to trigger next garbage collection, in milliseconds. Since garbage collection is running in background, too frequent gc will heart performance. It is better to give a higher number of gc interval if there is enough disk capacity.|900000|
+|gcOverreplicatedLedgerWaitTime|How long the interval to trigger next garbage collection of overreplicated ledgers, in milliseconds. This should not be run very frequently since we read the metadata for all the ledgers on the bookie from zk.|86400000|
+|flushInterval|How long the interval to flush ledger index pages to disk, in milliseconds. Flushing index files will introduce much random disk I/O. If separating journal dir and ledger dirs each on different devices, flushing would not affect performance. But if putting journal dir and ledger dirs on same device, performance degrade significantly on too frequent flushing. You can consider increment flush interval to get better performance, but you need to pay more time on bookie server restart after failure.|60000|
+|bookieDeathWatchInterval|Interval to watch whether bookie is dead or not, in milliseconds|1000|
+|zkServers|A list of one of more servers on which zookeeper is running. The server list can be comma separated values, for example: zkServers=zk1:2181,zk2:2181,zk3:2181.|localhost:2181|
+|zkTimeout|ZooKeeper client session timeout in milliseconds Bookie server will exit if it received SESSION_EXPIRED because it was partitioned off from ZooKeeper for more than the session timeout JVM garbage collection, disk I/O will cause SESSION_EXPIRED. Increment this value could help avoiding this issue|30000|
+|serverTcpNoDelay|This settings is used to enabled/disabled Nagle’s algorithm, which is a means of improving the efficiency of TCP/IP networks by reducing the number of packets that need to be sent over the network. If you are sending many small messages, such that more than one can fit in a single IP packet, setting server.tcpnodelay to false to enable Nagle algorithm can provide better performance.|true|
+|openFileLimit|Max number of ledger index files could be opened in bookie server If number of ledger index files reaches this limitation, bookie server started to swap some ledgers from memory to disk. Too frequent swap will affect performance. You can tune this number to gain performance according your requirements.|0|
+|pageSize|Size of a index page in ledger cache, in bytes A larger index page can improve performance writing page to disk, which is efficent when you have small number of ledgers and these ledgers have similar number of entries. If you have large number of ledgers and each ledger has fewer entries, smaller index page would improve memory usage.|8192|
+|pageLimit|How many index pages provided in ledger cache If number of index pages reaches this limitation, bookie server starts to swap some ledgers from memory to disk. You can increment this value when you found swap became more frequent. But make sure pageLimit*pageSize should not more than JVM max memory limitation, otherwise you would got OutOfMemoryException. In general, incrementing pageLimit, using smaller index page would gain bettern performance in lager number of ledgers with fewer entries case If pageLimit is -1, bookie server will use 1/3 of JVM memory to compute the limitation of number of index pages.|0|
+|readOnlyModeEnabled|If all ledger directories configured are full, then support only read requests for clients. If “readOnlyModeEnabled=true” then on all ledger disks full, bookie will be converted to read-only mode and serve only read requests. Otherwise the bookie will be shutdown. By default this will be disabled.|true|
+|diskUsageThreshold|For each ledger dir, maximum disk space which can be used. Default is 0.95f. i.e. 95% of disk can be used at most after which nothing will be written to that partition. If all ledger dir partions are full, then bookie will turn to readonly mode if ‘readOnlyModeEnabled=true’ is set, else it will shutdown. Valid values should be in between 0 and 1 (exclusive).|0.95|
+|diskCheckInterval|Disk check interval in milli seconds, interval to check the ledger dirs usage.|10000|
+|auditorPeriodicCheckInterval|Interval at which the auditor will do a check of all ledgers in the cluster. By default this runs once a week. The interval is set in seconds. To disable the periodic check completely, set this to 0. Note that periodic checking will put extra load on the cluster, so it should not be run more frequently than once a day.|604800|
+|auditorPeriodicBookieCheckInterval|The interval between auditor bookie checks. The auditor bookie check, checks ledger metadata to see which bookies should contain entries for each ledger. If a bookie which should contain entries is unavailable, thea the ledger containing that entry is marked for recovery. Setting this to 0 disabled the periodic check. Bookie checks will still run when a bookie fails. The interval is specified in seconds.|86400|
+|numAddWorkerThreads|number of threads that should handle write requests. if zero, the writes would be handled by netty threads directly.|0|
+|numReadWorkerThreads|number of threads that should handle read requests. if zero, the reads would be handled by netty threads directly.|8|
+|maxPendingReadRequestsPerThread|If read workers threads are enabled, limit the number of pending requests, to avoid the executor queue to grow indefinitely.|2500|
+|readBufferSizeBytes|The number of bytes we should use as capacity for BufferedReadChannel.|4096|
+|writeBufferSizeBytes|The number of bytes used as capacity for the write buffer|65536|
+|useHostNameAsBookieID|Whether the bookie should use its hostname to register with the coordination service (e.g.: zookeeper service). When false, bookie will use its ipaddress for the registration.|false|
+|statsProviderClass||org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvider|
+|prometheusStatsHttpPort||8000|
+|dbStorage_writeCacheMaxSizeMb|Size of Write Cache. Memory is allocated from JVM direct memory. Write cache is used to buffer entries before flushing into the entry log For good performance, it should be big enough to hold a sub|512|
+|dbStorage_readAheadCacheMaxSizeMb|Size of Read cache. Memory is allocated from JVM direct memory. This read cache is pre-filled doing read-ahead whenever a cache miss happens|256|
+|dbStorage_readAheadCacheBatchSize|How many entries to pre-fill in cache after a read cache miss|1000|
+|dbStorage_rocksDB_blockCacheSize|Size of RocksDB block-cache. For best performance, this cache should be big enough to hold a significant portion of the index database which can reach ~2GB in some cases|268435456|
+|dbStorage_rocksDB_writeBufferSizeMB||64|
+|dbStorage_rocksDB_sstSizeInMB||64|
+|dbStorage_rocksDB_blockSize||65536|
+|dbStorage_rocksDB_bloomFilterBitsPerKey||10|
+|dbStorage_rocksDB_numLevels||-1|
+|dbStorage_rocksDB_numFilesInLevel0||4|
+|dbStorage_rocksDB_maxSizeInLevel1MB||256|
+
+
+
+## Broker
+
+Pulsar brokers are responsible for handling incoming messages from producers, dispatching messages to consumers, replicating data between clusters, and more.
+
+|Name|Description|Default|
+|---|---|---|
+|enablePersistentTopics|  Whether persistent topics are enabled on the broker |true|
+|enableNonPersistentTopics| Whether non-persistent topics are enabled on the broker |true|
+|functionsWorkerEnabled|  Whether the Pulsar Functions worker service is enabled in the broker  |false|
+|zookeeperServers|  Zookeeper quorum connection string  ||
+|globalZookeeperServers|  Global Zookeeper quorum connection string ||
+|brokerServicePort| Broker data port  |6650|
+|brokerServicePortTls|  Broker data port for TLS  |6651|
+|webServicePort|  Port to use to server HTTP request  |8080|
+|webServicePortTls| Port to use to server HTTPS request |8443|
+|webSocketServiceEnabled| Enable the WebSocket API service in broker  |false|
+|bindAddress| Hostname or IP address the service binds on, default is 0.0.0.0.  |0.0.0.0|
+|advertisedAddress| Hostname or IP address the service advertises to the outside world. If not set, the value of `InetAddress.getLocalHost().getHostName()` is used.  ||
+|clusterName| Name of the cluster to which this broker belongs to ||
+|brokerDeduplicationEnabled|  Sets the default behavior for message deduplication in the broker. If enabled, the broker will reject messages that were already stored in the topic. This setting can be overridden on a per-namespace basis.  |false|
+|brokerDeduplicationMaxNumberOfProducers| The maximum number of producers for which information will be stored for deduplication purposes.  |10000|
+|brokerDeduplicationEntriesInterval|  The number of entries after which a deduplication informational snapshot is taken. A larger interval will lead to fewer snapshots being taken, though this would also lengthen the topic recovery time (the time required for entries published after the snapshot to be replayed). |1000|
+|brokerDeduplicationProducerInactivityTimeoutMinutes| The time of inactivity (in minutes) after which the broker will discard deduplication information related to a disconnected producer. |360|
+|zooKeeperSessionTimeoutMillis| Zookeeper session timeout in milliseconds |30000|
+|brokerShutdownTimeoutMs| Time to wait for broker graceful shutdown. After this time elapses, the process will be killed  |60000|
+|backlogQuotaCheckEnabled|  Enable backlog quota check. Enforces action on topic when the quota is reached  |true|
+|backlogQuotaCheckIntervalInSeconds|  How often to check for topics that have reached the quota |60|
+|backlogQuotaDefaultLimitGB|  Default per-topic backlog quota limit |10|
+|brokerDeleteInactiveTopicsEnabled| Enable the deletion of inactive topics  |true|
+|brokerDeleteInactiveTopicsFrequencySeconds|  How often to check for inactive topics  |60|
+|messageExpiryCheckIntervalInMinutes| How frequently to proactively check and purge expired messages  |5|
+|brokerServiceCompactionMonitorIntervalInSeconds| Interval between checks to see if topics with compaction policies need to be compacted  |60|
+|activeConsumerFailoverDelayTimeMillis| How long to delay rewinding cursor and dispatching messages when active consumer is changed.  |1000|
+|clientLibraryVersionCheckEnabled|  Enable check for minimum allowed client library version |false|
+|clientLibraryVersionCheckAllowUnversioned| Allow client libraries with no version information  |true|
+|statusFilePath|  Path for the file used to determine the rotation status for the broker when responding to service discovery health checks ||
+|preferLaterVersions| If true, (and ModularLoadManagerImpl is being used), the load manager will attempt to use only brokers running the latest software version (to minimize impact to bundles)  |false|
+|tlsEnabled|  Enable TLS  |false|
+|tlsCertificateFilePath|  Path for the TLS certificate file ||
+|tlsKeyFilePath|  Path for the TLS private key file ||
+|tlsTrustCertsFilePath| Path for the trusted TLS certificate file ||
+|tlsAllowInsecureConnection|  Accept untrusted TLS certificate from client  |false|
+|tlsProtocols|Specify the tls protocols the broker will use to negotiate during TLS Handshake. Multiple values can be specified, separated by commas. Example:- ```TLSv1.2```, ```TLSv1.1```, ```TLSv1``` ||
+|tlsCiphers|Specify the tls cipher the broker will use to negotiate during TLS Handshake. Multiple values can be specified, separated by commas. Example:- ```TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256```||
+|tokenSecretKey| Configure the secret key to be used to validate auth tokens. The key can be specified like: `tokenSecretKey=data:base64,xxxxxxxxx` or `tokenSecretKey=file:///my/secret.key`||
+|tokenPublicKey| Configure the public key to be used to validate auth tokens. The key can be specified like: `tokenPublicKey=data:base64,xxxxxxxxx` or `tokenPublicKey=file:///my/secret.key`||
+|maxUnackedMessagesPerConsumer| Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending messages to consumer once, this limit reaches until consumer starts acknowledging messages back. Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction  |50000|
+|maxUnackedMessagesPerSubscription| Max number of unacknowledged messages allowed per shared subscription. Broker will stop dispatching messages to all consumers of the subscription once this limit reaches until consumer starts acknowledging messages back and unack count reaches to limit/2. Using a value of 0, is disabling unackedMessage-limit check and dispatcher can dispatch messages without any restriction  |200000|
+|maxConcurrentLookupRequest|  Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic |50000|
+|maxConcurrentTopicLoadRequest| Max number of concurrent topic loading request broker allows to control number of zk-operations |5000|
+|authenticationEnabled| Enable authentication |false|
+|authenticationProviders| Autentication provider name list, which is comma separated list of class names  ||
+|authorizationEnabled|  Enforce authorization |false|
+|superUserRoles|  Role names that are treated as “super-user”, meaning they will be able to do all admin operations and publish/consume from all topics ||
+|brokerClientAuthenticationPlugin|  Authentication settings of the broker itself. Used when the broker connects to other brokers, either in same or other clusters  ||
+|brokerClientAuthenticationParameters|||
+|athenzDomainNames| Supported Athenz provider domain names(comma separated) for authentication  ||
+|bookkeeperClientAuthenticationPlugin|  Authentication plugin to use when connecting to bookies ||
+|bookkeeperClientAuthenticationParametersName|  BookKeeper auth plugin implementatation specifics parameters name and values  ||
+|bookkeeperClientAuthenticationParameters|||   
+|bookkeeperClientTimeoutInSeconds|  Timeout for BK add / read operations  |30|
+|bookkeeperClientSpeculativeReadTimeoutInMillis|  Speculative reads are initiated if a read request doesn’t complete within a certain time Using a value of 0, is disabling the speculative reads |0|
+|bookkeeperClientHealthCheckEnabled|  Enable bookies health check. Bookies that have more than the configured number of failure within the interval will be quarantined for some time. During this period, new ledgers won’t be created on these bookies  |true|
+|bookkeeperClientHealthCheckIntervalSeconds||60|
+|bookkeeperClientHealthCheckErrorThresholdPerInterval||5|
+|bookkeeperClientHealthCheckQuarantineTimeInSeconds ||1800|
+|bookkeeperClientRackawarePolicyEnabled|  Enable rack-aware bookie selection policy. BK will chose bookies from different racks when forming a new bookie ensemble  |true|
+|bookkeeperClientRegionawarePolicyEnabled|  Enable region-aware bookie selection policy. BK will chose bookies from different regions and racks when forming a new bookie ensemble. If enabled, the value of bookkeeperClientRackawarePolicyEnabled is ignored  |false|
+|bookkeeperClientReorderReadSequenceEnabled|  Enable/disable reordering read sequence on reading entries.  |false|
+|bookkeeperClientIsolationGroups| Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie outside the specified groups will not be used by the broker  ||
+|managedLedgerDefaultEnsembleSize|  Number of bookies to use when creating a ledger |2|
+|managedLedgerDefaultWriteQuorum| Number of copies to store for each message  |2|
+|managedLedgerDefaultAckQuorum| Number of guaranteed copies (acks to wait before write is complete) |2|
+|managedLedgerCacheSizeMB|  Amount of memory to use for caching data payload in managed ledger. This memory is allocated from JVM direct memory and it’s shared across all the topics running in the same broker  |1024|
+|managedLedgerCacheEvictionWatermark| Threshold to which bring down the cache level when eviction is triggered  |0.9|
+|managedLedgerDefaultMarkDeleteRateLimit| Rate limit the amount of writes per second generated by consumer acking the messages  |1.0|
+|managedLedgerMaxEntriesPerLedger|  Max number of entries to append to a ledger before triggering a rollover. A ledger rollover is triggered on these conditions: <ul><li>Either the max rollover time has been reached</li><li>or max entries have been written to the ledged and at least min-time has passed</li></ul>|50000|
+|managedLedgerMinLedgerRolloverTimeMinutes| Minimum time between ledger rollover for a topic  |10|
+|managedLedgerMaxLedgerRolloverTimeMinutes| Maximum time before forcing a ledger rollover for a topic |240|
+|managedLedgerCursorMaxEntriesPerLedger|  Max number of entries to append to a cursor ledger  |50000|
+|managedLedgerCursorRolloverTimeInSeconds|  Max time before triggering a rollover on a cursor ledger  |14400|
+|managedLedgerMaxUnackedRangesToPersist|  Max number of “acknowledgment holes” that are going to be persistently stored. When acknowledging out of order, a consumer will leave holes that are supposed to be quickly filled by acking all the messages. The information of which messages are acknowledged is persisted by compressing in “ranges” of messages that were acknowledged. After the max number of ranges is reached, the information will only be tracked in memory and messages will be redelivered in case of crashes.  |1000|
+|autoSkipNonRecoverableData|  Skip reading non-recoverable/unreadable data-ledger under managed-ledger’s list.It helps when data-ledgers gets corrupted at bookkeeper and managed-cursor is stuck at that ledger. |false|
+|loadBalancerEnabled| Enable load balancer  |true|
+|loadBalancerPlacementStrategy| Strategy to assign a new bundle weightedRandomSelection ||
+|loadBalancerReportUpdateThresholdPercentage| Percentage of change to trigger load report update  |10|
+|loadBalancerReportUpdateMaxIntervalMinutes|  maximum interval to update load report  |15|
+|loadBalancerHostUsageCheckIntervalMinutes| Frequency of report to collect  |1|
+|loadBalancerSheddingIntervalMinutes| Load shedding interval. Broker periodically checks whether some traffic should be offload from some over-loaded broker to other under-loaded brokers  |30|
+|loadBalancerSheddingGracePeriodMinutes|  Prevent the same topics to be shed and moved to other broker more that once within this timeframe |30|
+|loadBalancerBrokerMaxTopics| Usage threshold to allocate max number of topics to broker  |50000|
+|loadBalancerBrokerUnderloadedThresholdPercentage|  Usage threshold to determine a broker as under-loaded |1|
+|loadBalancerBrokerOverloadedThresholdPercentage| Usage threshold to determine a broker as over-loaded  |85|
+|loadBalancerResourceQuotaUpdateIntervalMinutes|  Interval to update namespace bundle resource quotat |15|
+|loadBalancerBrokerComfortLoadLevelPercentage|  Usage threshold to determine a broker is having just right level of load  |65|
+|loadBalancerAutoBundleSplitEnabled|  enable/disable namespace bundle auto split  |false|
+|loadBalancerNamespaceBundleMaxTopics|  maximum topics in a bundle, otherwise bundle split will be triggered  |1000|
+|loadBalancerNamespaceBundleMaxSessions|  maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered  |1000|
+|loadBalancerNamespaceBundleMaxMsgRate| maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered  |1000|
+|loadBalancerNamespaceBundleMaxBandwidthMbytes| maximum bandwidth (in + out) in a bundle, otherwise bundle split will be triggered  |100|
+|loadBalancerNamespaceMaximumBundles| maximum number of bundles in a namespace  |128|
+|replicationMetricsEnabled| Enable replication metrics  |true|
+|replicationConnectionsPerBroker| Max number of connections to open for each broker in a remote cluster More connections host-to-host lead to better throughput over high-latency links.  |16|
+|replicationProducerQueueSize|  Replicator producer queue size  |1000|
+|replicatorPrefix|  Replicator prefix used for replicator producer name and cursor name pulsar.repl||
+|replicationTlsEnabled| Enable TLS when talking with other clusters to replicate messages |false|
+|defaultRetentionTimeInMinutes| Default message retention time  ||
+|defaultRetentionSizeInMB|  Default retention size  |0|
+|keepAliveIntervalSeconds|  How often to check whether the connections are still alive  |30|
+|brokerServicePurgeInactiveFrequencyInSeconds|  How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected) |60|
+|loadManagerClassName|  Name of load manager to use |org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl|
+|managedLedgerOffloadDriver|  Driver to use to offload old data to long term storage (Possible values: S3)  ||
+|managedLedgerOffloadMaxThreads|  Maximum number of thread pool threads for ledger offloading |2|
+|s3ManagedLedgerOffloadRegion|  For Amazon S3 ledger offload, AWS region  ||
+|s3ManagedLedgerOffloadBucket|  For Amazon S3 ledger offload, Bucket to place offloaded ledger into ||
+|s3ManagedLedgerOffloadServiceEndpoint| For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing) ||
+|s3ManagedLedgerOffloadMaxBlockSizeInBytes| For Amazon S3 ledger offload, Max block size in bytes. (64MB by default, 5MB minimum) |67108864|
+|s3ManagedLedgerOffloadReadBufferSizeInBytes| For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default)  |1048576|
+
+
+
+
+## Client
+
+The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used to publish messages to Pulsar and consume messages from Pulsar topics. This tool can be used in lieu of a client library.
+
+|Name|Description|Default|
+|---|---|---|
+|webServiceUrl| The web URL for the cluster.  |http://localhost:8080/|
+|brokerServiceUrl|  The Pulsar protocol URL for the cluster.  |pulsar://localhost:6650/|
+|authPlugin|  The authentication plugin.  ||
+|authParams|  The authentication parameters for the cluster, as a comma-separated string. ||
+|useTls|  Whether or not TLS authentication will be enforced in the cluster.  |false|
+|tlsAllowInsecureConnection|||    
+|tlsTrustCertsFilePath|||
+
+
+## Service discovery
+
+|Name|Description|Default|
+|---|---|---|
+|zookeeperServers|  Zookeeper quorum connection string (comma-separated)  ||
+|globalZookeeperServers|  Global zookeeper quorum connection string (comma-separated) ||
+|zookeeperSessionTimeoutMs| ZooKeeper session timeout |30000|
+|servicePort| Port to use to server binary-proto request  |6650|
+|servicePortTls|  Port to use to server binary-proto-tls request  |6651|
+|webServicePort|  Port that discovery service listen on |8080|
+|webServicePortTls| Port to use to server HTTPS request |8443|
+|bindOnLocalhost| Control whether to bind directly on localhost rather than on normal hostname  |false|
+|authenticationEnabled| Enable authentication |false|
+|authenticationProviders| Authentication provider name list, which is comma separated list of class names (comma-separated) ||
+|authorizationEnabled|  Enforce authorization |false|
+|superUserRoles|  Role names that are treated as “super-user”, meaning they will be able to do all admin operations and publish/consume from all topics (comma-separated) ||
+|tlsEnabled|  Enable TLS  |false|
+|tlsCertificateFilePath|  Path for the TLS certificate file ||
+|tlsKeyFilePath|  Path for the TLS private key file ||
+
+
+
+## Log4j
+
+
+|Name|Default|
+|---|---|
+|pulsar.root.logger|  WARN,CONSOLE|
+|pulsar.log.dir|  logs|
+|pulsar.log.file| pulsar.log|
+|log4j.rootLogger|  ${pulsar.root.logger}|
+|log4j.appender.CONSOLE|  org.apache.log4j.ConsoleAppender|
+|log4j.appender.CONSOLE.Threshold|  DEBUG|
+|log4j.appender.CONSOLE.layout| org.apache.log4j.PatternLayout|
+|log4j.appender.CONSOLE.layout.ConversionPattern| %d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n|
+|log4j.appender.ROLLINGFILE|  org.apache.log4j.DailyRollingFileAppender|
+|log4j.appender.ROLLINGFILE.Threshold|  DEBUG|
+|log4j.appender.ROLLINGFILE.File| ${pulsar.log.dir}/${pulsar.log.file}|
+|log4j.appender.ROLLINGFILE.layout| org.apache.log4j.PatternLayout|
+|log4j.appender.ROLLINGFILE.layout.ConversionPattern| %d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n|
+|log4j.appender.TRACEFILE|  org.apache.log4j.FileAppender|
+|log4j.appender.TRACEFILE.Threshold|  TRACE|
+|log4j.appender.TRACEFILE.File| pulsar-trace.log|
+|log4j.appender.TRACEFILE.layout| org.apache.log4j.PatternLayout|
+|log4j.appender.TRACEFILE.layout.ConversionPattern| %d{ISO8601} - %-5p [%t:%C{1}@%L][%x] - %m%n|
+
+
+## Log4j shell
+
+|Name|Default|
+|---|---|
+|bookkeeper.root.logger|  ERROR,CONSOLE|
+|log4j.rootLogger|  ${bookkeeper.root.logger}|
+|log4j.appender.CONSOLE|  org.apache.log4j.ConsoleAppender|
+|log4j.appender.CONSOLE.Threshold|  DEBUG|
+|log4j.appender.CONSOLE.layout| org.apache.log4j.PatternLayout|
+|log4j.appender.CONSOLE.layout.ConversionPattern| %d{ABSOLUTE} %-5p %m%n|
+|log4j.logger.org.apache.zookeeper| ERROR|
+|log4j.logger.org.apache.bookkeeper|  ERROR|
+|log4j.logger.org.apache.bookkeeper.bookie.BookieShell| INFO|
+
+
+## Standalone
+
+|Name|Description|Default|
+|---|---|---|
+|zookeeperServers|  The quorum connection string for local ZooKeeper  ||
+|globalZookeeperServers|  The quorum connection string for global ZooKeeper ||
+|brokerServicePort| The port on which the standalone broker listens for connections |6650|
+|webServicePort|  THe port used by the standalone broker for HTTP requests  |8080|
+|bindAddress| The hostname or IP address on which the standalone service binds  |0.0.0.0|
+|advertisedAddress| The hostname or IP address that the standalone service advertises to the outside world. If not set, the value of `InetAddress.getLocalHost().getHostName()` is used.  ||
+|clusterName| The name of the cluster that this broker belongs to. |standalone|
+|zooKeeperSessionTimeoutMillis| The ZooKeeper session timeout, in milliseconds. |30000|
+|brokerShutdownTimeoutMs| The time to wait for graceful broker shutdown. After this time elapses, the process will be killed. |60000|
+|backlogQuotaCheckEnabled|  Enable the backlog quota check, which enforces a specified action when the quota is reached.  |true|
+|backlogQuotaCheckIntervalInSeconds|  How often to check for topics that have reached the backlog quota.  |60|
+|backlogQuotaDefaultLimitGB|  The default per-topic backlog quota limit.  |10|
+|brokerDeleteInactiveTopicsEnabled| Enable the deletion of inactive topics. |true|
+|brokerDeleteInactiveTopicsFrequencySeconds|  How often to check for inactive topics, in seconds. |60|
+|messageExpiryCheckIntervalInMinutes| How often to proactively check and purged expired messages. |5|
+|activeConsumerFailoverDelayTimeMillis| How long to delay rewinding cursor and dispatching messages when active consumer is changed.  |1000|
+|clientLibraryVersionCheckEnabled|  Enable checks for minimum allowed client library version. |false|
+|clientLibraryVersionCheckAllowUnversioned| Allow client libraries with no version information  |true|
+|statusFilePath|  The path for the file used to determine the rotation status for the broker when responding to service discovery health checks |/usr/local/apache/htdocs|
+|maxUnackedMessagesPerConsumer| The maximum number of unacknowledged messages allowed to be received by consumers on a shared subscription. The broker will stop sending messages to a consumer once this limit is reached or until the consumer begins acknowledging messages. A value of 0 disables the unacked message limit check and thus allows consumers to receive messages without any restrictions. |50000|
+|maxUnackedMessagesPerSubscription| The same as above, except per subscription rather than per consumer.  |200000|
+|authenticationEnabled| Enable authentication for the broker. |false|
+|authenticationProviders| A comma-separated list of class names for authentication providers. |false|
+|authorizationEnabled|  Enforce authorization in brokers. |false|
+|superUserRoles|  Role names that are treated as “superusers.” Superusers are authorized to perform all admin tasks. ||  
+|brokerClientAuthenticationPlugin|  The authentication settings of the broker itself. Used when the broker connects to other brokers either in the same cluster or from other clusters. ||
+|brokerClientAuthenticationParameters|  The parameters that go along with the plugin specified using brokerClientAuthenticationPlugin.  ||
+|athenzDomainNames| Supported Athenz authentication provider domain names as a comma-separated list.  ||
+|bookkeeperClientAuthenticationPlugin|  Authentication plugin to be used when connecting to bookies (BookKeeper servers). ||
+|bookkeeperClientAuthenticationParametersName|  BookKeeper authentication plugin implementation parameters and values.  ||
+|bookkeeperClientAuthenticationParameters|  Parameters associated with the bookkeeperClientAuthenticationParametersName ||
+|bookkeeperClientTimeoutInSeconds|  Timeout for BookKeeper add and read operations. |30|
+|bookkeeperClientSpeculativeReadTimeoutInMillis|  Speculative reads are initiated if a read request doesn’t complete within a certain time. A value of 0 disables speculative reads.  |0|
+|bookkeeperClientHealthCheckEnabled|  Enable bookie health checks.  |true|
+|bookkeeperClientHealthCheckIntervalSeconds|  The time interval, in seconds, at which health checks are performed. New ledgers are not created during health checks.  |60|
+|bookkeeperClientHealthCheckErrorThresholdPerInterval|  Error threshold for health checks.  |5|
+|bookkeeperClientHealthCheckQuarantineTimeInSeconds|  If bookies have more than the allowed number of failures within the time interval specified by bookkeeperClientHealthCheckIntervalSeconds |1800|
+|bookkeeperClientRackawarePolicyEnabled|    |true|
+|bookkeeperClientRegionawarePolicyEnabled|    |false|
+|bookkeeperClientReorderReadSequenceEnabled|    |false|
+|bookkeeperClientIsolationGroups|||   
+|managedLedgerDefaultEnsembleSize|    |1|
+|managedLedgerDefaultWriteQuorum|   |1|
+|managedLedgerDefaultAckQuorum|   |1|
+|managedLedgerCacheSizeMB|    |1024|
+|managedLedgerCacheEvictionWatermark|   |0.9|
+|managedLedgerDefaultMarkDeleteRateLimit|   |0.1|
+|managedLedgerMaxEntriesPerLedger|    |50000|
+|managedLedgerMinLedgerRolloverTimeMinutes|   |10|
+|managedLedgerMaxLedgerRolloverTimeMinutes|   |240|
+|managedLedgerCursorMaxEntriesPerLedger|    |50000|
+|managedLedgerCursorRolloverTimeInSeconds|    |14400|
+|autoSkipNonRecoverableData|    |false|
+|loadBalancerEnabled|   |false|
+|loadBalancerPlacementStrategy|   |weightedRandomSelection|
+|loadBalancerReportUpdateThresholdPercentage|   |10|
+|loadBalancerReportUpdateMaxIntervalMinutes|    |15|
+|loadBalancerHostUsageCheckIntervalMinutes|  |1|
+|loadBalancerSheddingIntervalMinutes|   |30|
+|loadBalancerSheddingGracePeriodMinutes|    |30|
+|loadBalancerBrokerMaxTopics|   |50000|
+|loadBalancerBrokerUnderloadedThresholdPercentage|    |1|
+|loadBalancerBrokerOverloadedThresholdPercentage|   |85|
+|loadBalancerResourceQuotaUpdateIntervalMinutes|    |15|
+|loadBalancerBrokerComfortLoadLevelPercentage|    |65|
+|loadBalancerAutoBundleSplitEnabled|    |false|
+|loadBalancerNamespaceBundleMaxTopics|    |1000|
+|loadBalancerNamespaceBundleMaxSessions|    |1000|
+|loadBalancerNamespaceBundleMaxMsgRate|   |1000|
+|loadBalancerNamespaceBundleMaxBandwidthMbytes|   |100|
+|loadBalancerNamespaceMaximumBundles|   |128|
+|replicationMetricsEnabled|   |true|
+|replicationConnectionsPerBroker|   |16|
+|replicationProducerQueueSize|    |1000|
+|defaultRetentionTimeInMinutes|   |0|
+|defaultRetentionSizeInMB|    |0|
+|keepAliveIntervalSeconds|    |30|
+|brokerServicePurgeInactiveFrequencyInSeconds|    |60|
+
+
+
+
+
+## WebSocket
+
+|Name|Description|Default|
+|---|---|---|
+|globalZookeeperServers    |||
+|zooKeeperSessionTimeoutMillis|   |30000|
+|serviceUrl|||
+|serviceUrlTls|||
+|brokerServiceUrl|||
+|brokerServiceUrlTls|||
+|webServicePort||8080|
+|webServicePortTls||8443|
+|bindAddress||0.0.0.0|
+|clusterName |||
+|authenticationEnabled||false|
+|authenticationProviders|||   
+|authorizationEnabled||false|
+|superUserRoles |||
+|brokerClientAuthenticationPlugin|||
+|brokerClientAuthenticationParameters|||
+|tlsEnabled||false|
+|tlsAllowInsecureConnection||false|
+|tlsCertificateFilePath|||
+|tlsKeyFilePath |||
+|tlsTrustCertsFilePath|||
+
+
+## Pulsar proxy
+
+The [Pulsar proxy](concepts-architecture-overview.md#pulsar-proxy) can be configured in the `conf/proxy.conf` file.
+
+
+|Name|Description|Default|
+|---|---|---|
+|zookeeperServers|  The ZooKeeper quorum connection string (as a comma-separated list)  ||
+|configurationStoreServers| Configuration store connection string (as a comma-separated list) ||
+|zookeeperSessionTimeoutMs| ZooKeeper session timeout (in milliseconds) |30000|
+|servicePort| The port to use for server binary Protobuf requests |6650|
+|servicePortTls|  The port to use to server binary Protobuf TLS requests  |6651|
+|statusFilePath|  Path for the file used to determine the rotation status for the proxy instance when responding to service discovery health checks ||
+|authenticationEnabled| Whether authentication is enabled for the Pulsar proxy  |false|
+|authenticationProviders| Authentication provider name list (a comma-separated list of class names) ||
+|authorizationEnabled|  Whether authorization is enforced by the Pulsar proxy |false|
+|authorizationProvider| Authorization provider as a fully qualified class name  |org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider|
+|brokerClientAuthenticationPlugin|  The authentication plugin used by the Pulsar proxy to authenticate with Pulsar brokers  ||
+|brokerClientAuthenticationParameters|  The authentication parameters used by the Pulsar proxy to authenticate with Pulsar brokers  ||
+|brokerClientTrustCertsFilePath|  The path to trusted certificates used by the Pulsar proxy to authenticate with Pulsar brokers ||
+|superUserRoles|  Role names that are treated as “super-users,” meaning that they will be able to perform all admin ||
+|forwardAuthorizationCredentials| Whether client authorization credentials are forwared to the broker for re-authorization. Authentication must be enabled via authenticationEnabled=true for this to take effect.  |false|
+|maxConcurrentInboundConnections| Max concurrent inbound connections. The proxy will reject requests beyond that. |10000|
+|maxConcurrentLookupRequests| Max concurrent outbound connections. The proxy will error out requests beyond that. |50000|
+|tlsEnabledInProxy| Whether TLS is enabled for the proxy  |false|
+|tlsEnabledWithBroker|  Whether TLS is enabled when communicating with Pulsar brokers |false|
+|tlsCertificateFilePath|  Path for the TLS certificate file ||
+|tlsKeyFilePath|  Path for the TLS private key file ||
+|tlsTrustCertsFilePath| Path for the trusted TLS certificate pem file ||
+|tlsHostnameVerificationEnabled|  Whether the hostname is validated when the proxy creates a TLS connection with brokers  |false|
+|tlsRequireTrustedClientCertOnConnect|  Whether client certificates are required for TLS. Connections are rejected if the client certificate isn’t trusted. |false|
+|tlsProtocols|Specify the tls protocols the broker will use to negotiate during TLS Handshake. Multiple values can be specified, separated by commas. Example:- ```TLSv1.2```, ```TLSv1.1```, ```TLSv1``` ||
+|tlsCiphers|Specify the tls cipher the broker will use to negotiate during TLS Handshake. Multiple values can be specified, separated by commas. Example:- ```TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256```||
+|tokenSecretKey| Configure the secret key to be used to validate auth tokens. The key can be specified like: `tokenSecretKey=data:base64,xxxxxxxxx` or `tokenSecretKey=file:///my/secret.key`||
+|tokenPublicKey| Configure the public key to be used to validate auth tokens. The key can be specified like: `tokenPublicKey=data:base64,xxxxxxxxx` or `tokenPublicKey=file:///my/secret.key`||
+
+## ZooKeeper
+
+ZooKeeper handles a broad range of essential configuration- and coordination-related tasks for Pulsar. The default configuration file for ZooKeeper is in the `conf/zookeeper.conf` file in your Pulsar installation. The following parameters are available:
+
+
+|Name|Description|Default|
+|---|---|---|
+|tickTime|  The tick is the basic unit of time in ZooKeeper, measured in milliseconds and used to regulate things like heartbeats and timeouts. tickTime is the length of a single tick.  |2000|
+|initLimit| The maximum time, in ticks, that the leader ZooKeeper server allows follower ZooKeeper servers to successfully connect and sync. The tick time is set in milliseconds using the tickTime parameter. |10|
+|syncLimit| The maximum time, in ticks, that a follower ZooKeeper server is allowed to sync with other ZooKeeper servers. The tick time is set in milliseconds using the tickTime parameter.  |5|
+|dataDir| The location where ZooKeeper will store in-memory database snapshots as well as the transaction log of updates to the database. |data/zookeeper|
+|clientPort|  The port on which the ZooKeeper server will listen for connections. |2181|
+|autopurge.snapRetainCount| In ZooKeeper, auto purge determines how many recent snapshots of the database stored in dataDir to retain within the time interval specified by autopurge.purgeInterval (while deleting the rest).  |3|
+|autopurge.purgeInterval| The time interval, in hours, by which the ZooKeeper database purge task is triggered. Setting to a non-zero number will enable auto purge; setting to 0 will disable. Read this guide before enabling auto purge. |1|
+|maxClientCnxns|  The maximum number of client connections. Increase this if you need to handle more ZooKeeper clients. |60|
+
+
+
+
+In addition to the parameters in the table above, configuring ZooKeeper for Pulsar involves adding
+a `server.N` line to the `conf/zookeeper.conf` file for each node in the ZooKeeper cluster, where `N` is the number of the ZooKeeper node. Here's an example for a three-node ZooKeeper cluster:
+
+```properties
+server.1=zk1.us-west.example.com:2888:3888
+server.2=zk2.us-west.example.com:2888:3888
+server.3=zk3.us-west.example.com:2888:3888
+```
+
+> We strongly recommend consulting the [ZooKeeper Administrator's Guide](https://zookeeper.apache.org/doc/current/zookeeperAdmin.html) for a more thorough and comprehensive introduction to ZooKeeper configuration
diff --git a/site2/website/versioned_docs/version-2.2.1/security-tls-authentication.md b/site2/website/versioned_docs/version-2.2.1/security-tls-authentication.md
new file mode 100644
index 0000000000..d0e94fd2c5
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/security-tls-authentication.md
@@ -0,0 +1,137 @@
+---
+id: version-2.2.1-security-tls-authentication
+title: Authentication using TLS
+sidebar_label: Authentication using TLS
+original_id: security-tls-authentication
+---
+
+## TLS Authentication Overview
+
+TLS authentication is an extension of [TLS transport encryption](security-tls-transport.md), but instead of only servers having keys and certs which the client uses to verify the server's identity, clients also have keys and certs which the server uses to verify the client's identity. You must have TLS transport encryption configured on your cluster before you can use TLS authentication. This guide assumes you already have TLS transport encryption configured.
+
+### Creating client certificates
+
+Client certificates are generated using the same certificate authority as was used to generate the server certificates.
+
+The biggest difference between client certs and server certs is that the **common name** for the client certificate is the **role token** which that client will be authenticated as.
+
+First generate the key.
+```bash
+$ openssl genrsa -out admin.key.pem 2048
+```
+
+Similar to the broker, the client expects the key to be in [PKCS 8](https://en.wikipedia.org/wiki/PKCS_8) format, so convert it.
+
+```bash
+$ openssl pkcs8 -topk8 -inform PEM -outform PEM \
+      -in admin.key.pem -out admin.key-pk8.pem -nocrypt
+```
+
+Generate the certificate request. When asked for a **common name**, enter the **role token** which you want this key pair to authenticate a client as.
+
+```bash
+$ openssl req -config openssl.cnf \
+      -key admin.key.pem -new -sha256 -out admin.csr.pem
+```
+
+Sign with request with the certificate authority. Note that that client certs uses the **usr_cert** extension, which allows the cert to be used for client authentication.
+
+```bash
+$ openssl ca -config openssl.cnf -extensions usr_cert \
+      -days 1000 -notext -md sha256 \
+      -in admin.csr.pem -out admin.cert.pem
+```
+
+This will give you a cert, `admin.cert.pem`, and a key, `admin.key-pk8.pem`, which, with `ca.cert.pem`, can be used by clients to authenticate themselves to brokers and proxies as the role token ``admin``.
+
+## Enabling TLS Authentication ...
+
+### ... on Brokers
+
+To configure brokers to authenticate clients, put the following in `broker.conf`, alongside [the configuration to enable tls transport](security-tls-transport.md#broker-configuration):
+
+```properties
+# Configuration to enable authentication
+authenticationEnabled=true
+authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderTls
+```
+
+### ... on Proxies
+
+To configure proxies to authenticate clients, put the folling in `proxy.conf`, alongside [the configuration to enable tls transport](security-tls-transport.md#proxy-configuration):
+
+The proxy should have its own client key pair for connecting to brokers. The role token for this key pair should be configured in the ``proxyRoles`` of the brokers. See the [authorization guide](security-authorization.md) for more details.
+
+```properties
+# For clients connecting to the proxy
+authenticationEnabled=true
+authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderTls
+
+# For the proxy to connect to brokers
+brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationTls
+brokerClientAuthenticationParameters=tlsCertFile:/path/to/proxy.cert.pem,tlsKeyFile:/path/to/proxy.key-pk8.pem
+```
+
+## Client configuration
+
+When TLS authentication, the client needs to connect via TLS transport, so you need to configure the client to use ```https://``` and port 8443 for the web service URL, and ```pulsar+ssl://``` and port 6651 for the broker service URL.
+
+### CLI tools
+
+[Command-line tools](reference-cli-tools.md) like [`pulsar-admin`](reference-pulsar-admin.md), [`pulsar-perf`](reference-cli-tools.md#pulsar-perf), and [`pulsar-client`](reference-cli-tools.md#pulsar-client) use the `conf/client.conf` config file in a Pulsar installation.
+
+You'll need to add the following parameters to that file to use TLS authentication with Pulsar's CLI tools:
+
+```properties
+webServiceUrl=https://broker.example.com:8443/
+brokerServiceUrl=pulsar+ssl://broker.example.com:6651/
+useTls=true
+tlsAllowInsecureConnection=false
+tlsTrustCertsFilePath=/path/to/ca.cert.pem
+authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationTls
+authParams=tlsCertFile:/path/to/my-role.cert.pem,tlsKeyFile:/path/to/my-role.key-pk8.pem
+```
+
+### Java client
+
+```java
+import org.apache.pulsar.client.api.PulsarClient;
+
+PulsarClient client = PulsarClient.builder()
+    .serviceUrl("pulsar+ssl://broker.example.com:6651/")
+    .enableTls(true)
+    .tlsTrustCertsFilePath("/path/to/ca.cert.pem")
+    .authentication("org.apache.pulsar.client.impl.auth.AuthenticationTls",
+                    "tlsCertFile:/path/to/my-role.cert.pem,tlsKeyFile:/path/to/my-role.key-pk8.pem")
+    .build();
+```
+
+### Python client
+
+```python
+from pulsar import Client, AuthenticationTLS
+
+auth = AuthenticationTLS("/path/to/my-role.cert.pem", "/path/to/my-role.key-pk8.pem")
+client = Client("pulsar+ssl://broker.example.com:6651/",
+                tls_trust_certs_file_path="/path/to/ca.cert.pem",
+                tls_allow_insecure_connection=False,
+				authentication=auth)
+```
+
+### C++ client
+
+```c++
+#include <pulsar/Client.h>
+
+pulsar::ClientConfiguration config;
+config.setUseTls(true);
+config.setTlsTrustCertsFilePath("/path/to/ca.cert.pem");
+config.setTlsAllowInsecureConnection(false);
+
+pulsar::AuthenticationPtr auth = pulsar::AuthTls::create("/path/to/my-role.cert.pem",
+                                                         "/path/to/my-role.key-pk8.pem")
+config.setAuth(auth);
+
+pulsar::Client client("pulsar+ssl://broker.example.com:6651/", config);
+```
+
diff --git a/site2/website/versioned_docs/version-2.2.1/security-tls-transport.md b/site2/website/versioned_docs/version-2.2.1/security-tls-transport.md
new file mode 100644
index 0000000000..6a041f689e
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/security-tls-transport.md
@@ -0,0 +1,228 @@
+---
+id: version-2.2.1-security-tls-transport
+title: Transport Encryption using TLS
+sidebar_label: Transport Encryption using TLS
+original_id: security-tls-transport
+---
+
+## TLS Overview
+
+By default, Apache Pulsar clients communicate with the Apache Pulsar service in plain text, which means that all data is sent in the clear. TLS can be used to encrypt this traffic so that it cannot be snooped by a man-in-the-middle attacker.
+
+TLS can be configured for both encryption and authentication. You may configure just TLS transport encryption, which is covered in this guide. TLS authentication is covered [elsewhere](security-tls-authentication.md). Alternatively, you can use [another authentication mechanism](security-athenz.md) on top of TLS transport encryption.
+
+> Note that enabling TLS may have a performance impact due to encryption overhead.
+
+## TLS concepts
+
+TLS is a form of [public key cryptography](https://en.wikipedia.org/wiki/Public-key_cryptography). Encryption is performed using key pairs consisting of a public key and a private key. Messages are encrypted with the public key and can be decrypted with the private key.
+
+To use TLS transport encryption, you need two kinds of key pairs, **server key pairs** and a **certificate authority**.
+
+A third kind of key pair, **client key pairs**, are used for [client authentication](security-tls-authentication.md).
+
+The **certificate authority** private key should be stored in a very secure location (a fully encrypted, disconnected, air gapped computer). The certificate authority public key, the **trust cert**, can be freely shared.
+
+For both client and server key pairs, the administrator first generates a private key and a certificate request. Then the certificate authority private key is used to sign the certificate request, generating a certificate. This certificate is the public key for the server/client key pair.
+
+For TLS transport encryption, the clients can use the **trust cert** to verify that the server they are talking to has a key pair that was signed by the certificate authority. A man-in-the-middle attacker would not have access to the certificate authority, so they couldn't create a server with such a key pair.
+
+For TLS authentication, the server uses the **trust cert** to verify that the client has a key pair that was signed by the certificate authority. The Common Name of the **client cert** is then used as the client's role token (see [Overview](security-overview.md)).
+
+## Creating TLS Certificates
+
+Creating TLS certificates for Pulsar involves creating a [certificate authority](#certificate-authority) (CA), [server certificate](#server-certificate), and [client certificate](#client-certificate).
+
+The following guide is an abridged guide to setting up a certificate authority. For a more detailed guide, there are plenty of resource on the internet. We recommend the [this guide](https://jamielinux.com/docs/openssl-certificate-authority/index.html).
+
+### Certificate authority
+
+The first step is to create the certificate for the CA. The CA will be used to sign both the broker and client certificates, in order to ensure that each party will trust the others. The CA should be stored in a very secure location (ideally completely disconnected from networks, air gapped, and fully encrypted).
+
+Create a directory for your CA, and place [this openssl configuration file](https://github.com/apache/pulsar/tree/master/site2/website/static/examples/openssl.cnf) in the directory. You may want to modify the default answers for company name and department in the configuration file. Export the location of the CA directory to the environment variable, CA_HOME. The configuration file uses this environment variable to find the rest of the files and directories needed for the CA.
+
+```bash
+$ mkdir my-ca
+$ cd my-ca
+$ wget https://raw.githubusercontent.com/apache/pulsar/master/site2/website/static/examples/openssl.cnf
+$ export CA_HOME=$(pwd)
+```
+
+Create the necessary directories, keys and certs.
+
+```bash
+$ mkdir certs crl newcerts private
+$ chmod 700 private/
+$ touch index.txt
+$ echo 1000 > serial
+$ openssl genrsa -aes256 -out private/ca.key.pem 4096
+$ chmod 400 private/ca.key.pem
+$ openssl req -config openssl.cnf -key private/ca.key.pem \
+      -new -x509 -days 7300 -sha256 -extensions v3_ca \
+      -out certs/ca.cert.pem
+$ chmod 444 certs/ca.cert.pem
+```
+
+After answering the question prompts, this will store CA-related files in the `./my-ca` directory. Within that directory:
+
+* `certs/ca.cert.pem` is the public certificate. It is meant to be distributed to all parties involved.
+* `private/ca.key.pem` is the private key. This is only needed when signing a new certificate for either broker or clients and it must be safely guarded.
+
+### Server certificate
+
+Once a CA certificate has been created, you can create certificate requests and sign them with the CA.
+
+The following commands will ask you a few questions and then create the certificates. When asked for the common name, you should match the hostname of the broker. You could also use a wildcard to match a group of broker hostnames, for example `*.broker.usw.example.com`. This ensures that the same certificate can be reused on multiple machines.
+
+> #### Tips
+> 
+> Sometimes it is not possible or makes no sense to match the hostname,
+> such as when the brokers are created with random hostnames, or you
+> plan to connect to the hosts via their IP. In this case, the client
+> should be configured to disable TLS hostname verification. For more
+> details, see [the host verification section in client configuration](#hostname-verification).
+
+First generate the key.
+```bash
+$ openssl genrsa -out broker.key.pem 2048
+```
+
+The broker expects the key to be in [PKCS 8](https://en.wikipedia.org/wiki/PKCS_8) format, so convert it.
+
+```bash
+$ openssl pkcs8 -topk8 -inform PEM -outform PEM \
+      -in broker.key.pem -out broker.key-pk8.pem -nocrypt
+```
+
+Generate the certificate request...
+
+```bash
+$ openssl req -config openssl.cnf \
+      -key broker.key.pem -new -sha256 -out broker.csr.pem
+```
+
+... and sign it with the certificate authority.
+```bash
+$ openssl ca -config openssl.cnf -extensions server_cert \
+      -days 1000 -notext -md sha256 \
+      -in broker.csr.pem -out broker.cert.pem
+```
+
+At this point, you have a cert, `broker.cert.pem`, and a key, `broker.key-pk8.pem`, which can be used along with `ca.cert.pem` to configure TLS transport encryption for your broker and proxy nodes.
+
+## Broker Configuration
+
+To configure a Pulsar [broker](reference-terminology.md#broker) to use TLS transport encryption, you'll need to make some changes to `broker.conf`, which is located in the `conf` directory of your [Pulsar installation](getting-started-standalone.md).
+
+Add these values to the configuration file (substituting the appropriate certificate paths where necessary):
+
+```properties
+tlsEnabled=true
+tlsCertificateFilePath=/path/to/broker.cert.pem
+tlsKeyFilePath=/path/to/broker.key-pk8.pem
+tlsTrustCertsFilePath=/path/to/ca.cert.pem
+```
+
+> A full list of parameters available in the `conf/broker.conf` file,
+> as well as the default values for those parameters, can be found in [Broker Configuration](reference-configuration.md#broker) 
+
+### TLS Protocol Version and Cipher
+
+The broker (and proxy) can be configured to require specific TLS protocol versions and ciphers for TLS negiotation. This can be used to stop clients from requesting downgraded TLS protocol versions or ciphers which may have weaknesses.
+
+Both the TLS protocol versions and cipher properties can take multiple values, separated by commas. The possible values for protocol version and ciphers depend on the TLS provider being used. Pulsar uses OpenSSL if available, but if not defaults back to the JDK implementation.
+
+```properties
+tlsProtocols=TLSv1.2,TLSv1.1
+tlsCiphers=TLS_DH_RSA_WITH_AES_256_GCM_SHA384,TLS_DH_RSA_WITH_AES_256_CBC_SHA
+```
+
+OpenSSL currently supports ```SSL2```, ```SSL3```, ```TLSv1```, ```TLSv1.1``` and ```TLSv1.2``` for the protocol version. A list of supported cipher can be acquired from the openssl ciphers command, i.e. ```openssl ciphers -tls_v2```.
+
+For JDK 8, a list of supported values can be obtained from the documentation:
+- [TLS protocol](https://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#SSLContext)
+- [Ciphers](https://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites)
+
+## Proxy Configuration
+
+Proxies need to configure TLS in two directions, for clients connecting to the proxy, and for the proxy to be able to connect to brokers.
+
+```properties
+# For clients connecting to the proxy
+tlsEnabledInProxy=true
+tlsCertificateFilePath=/path/to/broker.cert.pem
+tlsKeyFilePath=/path/to/broker.key-pk8.pem
+tlsTrustCertsFilePath=/path/to/ca.cert.pem
+
+# For the proxy to connect to brokers
+tlsEnabledWithBroker=true
+brokerClientTrustCertsFilePath=/path/to/ca.cert.pem
+```
+
+## Client configuration
+
+When TLS transport encryption is enabled, you need to configure the client to use ```https://``` and port 8443 for the web service URL, and ```pulsar+ssl://``` and port 6651 for the broker service URL.
+
+As the server certificate you generated above doesn't belong to any of the default trust chains, you also need to either specify the path the **trust cert** (recommended), or tell the client to allow untrusted server certs.
+
+#### Hostname verification
+
+Hostname verification is a TLS security feature whereby a client can refuse to connect to a server if the "CommonName" does not match the hostname to which it is connecting. By default, Pulsar clients disable hostname verification, as it requires that each broker has a DNS record and a unique cert.
+
+Moreover, as the administrator has full control of the certificate authority, it is unlikely that a bad actor would be able to pull off a man-in-the-middle attack. "allowInsecureConnection" allows the client to connect to servers whose cert has not been signed by an approved CA. The client disables it by default, and should always be disabled in production environments. As long as "allowInsecureConnection" is disabled, a man-in-the-middle attack would require that the attacker has access to the CA.
+
+One scenario where you may want to enable hostname verification is where you have multiple proxy nodes behind a VIP, and the VIP has a DNS record, for example, pulsar.mycompany.com. In this case, you can generate a TLS cert with pulsar.mycompany.com as the "CommonName," and then enable hostname verification on the client.
+
+The examples below show hostname verification being disabled for the Java client, though you can be omit this as the client disables it by default. C++/python clients do now allow this to be configured at the moment.
+
+### CLI tools
+
+[Command-line tools](reference-cli-tools.md) like [`pulsar-admin`](reference-cli-tools#pulsar-admin), [`pulsar-perf`](reference-cli-tools#pulsar-perf), and [`pulsar-client`](reference-cli-tools#pulsar-client) use the `conf/client.conf` config file in a Pulsar installation.
+
+You'll need to add the following parameters to that file to use TLS transport with Pulsar's CLI tools:
+
+```properties
+webServiceUrl=https://broker.example.com:8443/
+brokerServiceUrl=pulsar+ssl://broker.example.com:6651/
+useTls=true
+tlsAllowInsecureConnection=false
+tlsTrustCertsFilePath=/path/to/ca.cert.pem
+tlsEnableHostnameVerification=false
+```
+
+### Java client
+
+```java
+import org.apache.pulsar.client.api.PulsarClient;
+
+PulsarClient client = PulsarClient.builder()
+    .serviceUrl("pulsar+ssl://broker.example.com:6651/")
+    .enableTls(true)
+    .tlsTrustCertsFilePath("/path/to/ca.cert.pem")
+    .enableTlsHostnameVerification(false) // false by default, in any case
+    .allowTlsInsecureConnection(false) // false by default, in any case
+    .build();
+```
+
+### Python client
+
+```python
+from pulsar import Client
+
+client = Client("pulsar+ssl://broker.example.com:6651/",
+                tls_trust_certs_file_path="/path/to/ca.cert.pem",
+                tls_allow_insecure_connection=False) // defaults to false from v2.2.0 onwards
+```
+
+### C++ client
+
+```c++
+#include <pulsar/Client.h>
+
+pulsar::ClientConfiguration config;
+config.setUseTls(true);
+config.setTlsTrustCertsFilePath("/path/to/ca.cert.pem");
+config.setTlsAllowInsecureConnection(false); // defaults to false from v2.2.0 onwards
+
+pulsar::Client client("pulsar+ssl://broker.example.com:6651/", config);
+```
diff --git a/site2/website/versioned_docs/version-2.2.1/security-token-admin.md b/site2/website/versioned_docs/version-2.2.1/security-token-admin.md
new file mode 100644
index 0000000000..4caea6282e
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/security-token-admin.md
@@ -0,0 +1,153 @@
+---
+id: version-2.2.1-security-token-admin
+title: Token authentication admin
+sidebar_label: Token authentication admin
+original_id: security-token-admin
+---
+
+## Token Authentication Overview
+
+Pulsar supports authenticating clients using security tokens that are based on
+[JSON Web Tokens](https://jwt.io/introduction/) ([RFC-7519](https://tools.ietf.org/html/rfc7519)).
+
+Tokens are used to identify a Pulsar client and associate with some "principal" (or "role") which
+will be then granted permissions to do some actions (eg: publish or consume from a topic).
+
+A user will typically be given a token string by an administrator (or some automated service).
+
+The compact representation of a signed JWT is a string that looks like:
+
+```
+ eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY
+ ```
+
+Application will specify the token when creating the client instance. An alternative is to pass
+a "token supplier", that is to say a function that returns the token when the client library
+will need one.
+
+> #### Always use TLS transport encryption
+> Sending a token is equivalent to sending a password over the wire. It is strongly recommended to
+> always use TLS encryption when talking to the Pulsar service. See
+> [Transport Encryption using TLS](security-tls-transport.md)
+
+## Secret vs Public/Private keys
+
+JWT support two different kind of keys in order to generate and validate the tokens:
+
+ * Symmetric :
+    - there is a single ***Secret*** key that is used both to generate and validate
+ * Asymmetric: there is a pair of keys.
+    - ***Private*** key is used to generate tokens
+    - ***Public*** key is used to validate tokens
+
+### Secret key
+
+When using a secret key, the administrator will create the key and he will
+use it to generate the client tokens. This key will be also configured to
+the brokers to allow them to validate the clients.
+
+#### Creating a secret key
+
+```shell
+$ bin/pulsar tokens create-secret-key --output my-secret.key
+```
+
+### Public/Private keys
+
+With public/private, we need to create a pair of keys.
+
+#### Creating a key pair
+
+```shell
+$ bin/pulsar tokens create-key-pair --output-private-key my-private.key --output-public-key my-public.key
+```
+
+ * `my-private.key` will be stored in a safe location and only used by administrator to generate
+   new tokens.
+ * `my-public.key` will be distributed to all Pulsar brokers. This file can be publicly shared without
+   any security concern.
+
+## Generating tokens
+
+A token is the credential associated with a user. The association is done through the "principal",
+or "role". In case of JWT tokens, this field it's typically referred to as **subject**, though
+it's exactly the same concept.
+
+The generated token is then required to have a **subject** field set.
+
+```shell
+$ bin/pulsar tokens create --secret-key file:///path/to/my-secret.key \
+            --subject test-user
+```
+
+This will print the token string on stdout.
+
+Similarly, one can create a token by passing the "private" key:
+
+```shell
+$ bin/pulsar tokens create --private-key file:///path/to/my-private.key \
+            --subject test-user
+```
+
+Finally, a token can also be created with a pre-defined TTL. After that time,
+the token will be automatically invalidated.
+
+```shell
+$ bin/pulsar tokens create --secret-key file:///path/to/my-secret.key \
+            --subject test-user \
+            --expiry-time 1y
+```
+
+## Authorization
+
+The token itself doesn't have any permission associated. That will be determined by the
+authorization engine. Once the token is created, one can grant permission for this token to do certain
+actions. Eg. :
+
+```shell
+$ bin/pulsar-admin namespaces grant-permission my-tenant/my-namespace \
+            --role test-user \
+            --actions produce,consume
+```
+
+## Enabling Token Authentication ...
+
+### ... on Brokers
+
+To configure brokers to authenticate clients, put the following in `broker.conf`:
+
+```properties
+# Configuration to enable authentication and authorization
+authenticationEnabled=true
+authorizationEnabled=true
+authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
+
+# If using secret key
+tokenSecretKey=file:///path/to/secret.key
+# The key can also be passed inline:
+# tokenSecretKey=data:base64,FLFyW0oLJ2Fi22KKCm21J18mbAdztfSHN/lAT5ucEKU=
+
+# If using public/private
+# tokenPublicKey=file:///path/to/public.key
+```
+
+### ... on Proxies
+
+To configure proxies to authenticate clients, put the following in `proxy.conf`:
+
+The proxy will have its own token used when talking to brokers. The role token for this
+key pair should be configured in the ``proxyRoles`` of the brokers. See the [authorization guide](security-authorization.md) for more details.
+
+```properties
+# For clients connecting to the proxy
+authenticationEnabled=true
+authorizationEnabled=true
+authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
+tokenSecretKey=file:///path/to/secret.key
+
+# For the proxy to connect to brokers
+brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
+brokerClientAuthenticationParameters=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.9OHgE9ZUDeBTZs7nSMEFIuGNEX18FLR3qvy8mqxSxXw
+# Or, alternatively, read token from file
+# brokerClientAuthenticationParameters=file:///path/to/proxy-token.txt
+```
diff --git a/site2/website/versioned_docs/version-2.2.1/security-token-client.md b/site2/website/versioned_docs/version-2.2.1/security-token-client.md
new file mode 100644
index 0000000000..9f167390bb
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.2.1/security-token-client.md
@@ -0,0 +1,126 @@
+---
+id: version-2.2.1-security-token-client
+title: Client Authentication using tokens
+sidebar_label: Client Authentication using tokens
+original_id: security-token-client
+---
+
+## Token Authentication Overview
+
+Pulsar supports authenticating clients using security tokens that are based on
+[JSON Web Tokens](https://jwt.io/introduction/) ([RFC-7519](https://tools.ietf.org/html/rfc7519)).
+
+Tokens are used to identify a Pulsar client and associate with some "principal" (or "role") which
+will be then granted permissions to do some actions (eg: publish or consume from a topic).
+
+A user will typically be given a token string by an administrator (or some automated service).
+
+The compact representation of a signed JWT is a string that looks like:
+
+```
+ eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY
+ ```
+
+Application will specify the token when creating the client instance. An alternative is to pass
+a "token supplier", that is to say a function that returns the token when the client library
+will need one.
+
+See [Token authentication admin](security-token-admin.md) for a reference on how to enable token
+authentication on a Pulsar cluster.
+
+### CLI tools
+
+[Command-line tools](reference-cli-tools.md) like [`pulsar-admin`](reference-pulsar-admin.md), [`pulsar-perf`](reference-cli-tools.md#pulsar-perf), and [`pulsar-client`](reference-cli-tools.md#pulsar-client) use the `conf/client.conf` config file in a Pulsar installation.
+
+You'll need to add the following parameters to that file to use the token authentication with
+Pulsar's CLI tools:
+
+```properties
+webServiceUrl=http://broker.example.com:8080/
+brokerServiceUrl=pulsar://broker.example.com:6650/
+authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
+authParams=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY
+```
+
+The token string can also be read from a file, eg:
+
+```
+authParams=file:///path/to/token/file
+```
+
+### Java client
+
+```java
+PulsarClient client = PulsarClient.builder()
+    .serviceUrl("pulsar://broker.example.com:6650/")
+    .authentication(
+        AuthenticationFactory.token("eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY")
+    .build();
+```
+
+Similarly, one can also pass a `Supplier`:
+
+```java
+PulsarClient client = PulsarClient.builder()
+    .serviceUrl("pulsar://broker.example.com:6650/")
+    .authentication(
+        AuthenticationFactory.token(() -> {
+            // Read token from custom source
+            return readToken();
+        })
+    .build();
+```
+
+### Python client
+
+```python
+from pulsar import Client, AuthenticationToken
+
+client = Client('pulsar://broker.example.com:6650/'
+                authentication=AuthenticationToken('eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY'))
+```
+
+Alternatively, with a supplier:
+
+```python
+
+def read_token():
+    with open('/path/to/token.txt') as tf:
+        return tf.read().strip()
+
+client = Client('pulsar://broker.example.com:6650/'
+                authentication=AuthenticationToken(read_token))
+```
+
+### Go client
+
+
+```go
+client, err := NewClient(ClientOptions{
+	URL:            "pulsar://localhost:6650",
+	Authentication: NewAuthenticationToken("eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY"),
+})
+```
+
+Alternatively, with a supplier:
+
+```go
+client, err := NewClient(ClientOptions{
+	URL:            "pulsar://localhost:6650",
+	Authentication: NewAuthenticationTokenSupplier(func () string {
+        // Read token from custom source
+		return readToken()
+	}),
+})
+```
+
+### C++ client
+
+```c++
+#include <pulsar/Client.h>
+
+pulsar::ClientConfiguration config;
+config.setAuth(pulsar::AuthToken::createWithToken("eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY"));
+
+pulsar::Client client("pulsar://broker.example.com:6650/", config);
+```
diff --git a/site2/website/versioned_sidebars/version-2.2.1-sidebars.json b/site2/website/versioned_sidebars/version-2.2.1-sidebars.json
new file mode 100644
index 0000000000..a18cdc61eb
--- /dev/null
+++ b/site2/website/versioned_sidebars/version-2.2.1-sidebars.json
@@ -0,0 +1,121 @@
+{
+  "version-2.2.0-docs": {
+    "Getting started": [
+      "version-2.2.0-pulsar-2.0",
+      "version-2.2.0-standalone",
+      "version-2.2.0-standalone-docker",
+      "version-2.2.0-client-libraries"
+    ],
+    "Concepts and Architecture": [
+      "version-2.2.0-concepts-overview",
+      "version-2.2.0-concepts-messaging",
+      "version-2.2.0-concepts-architecture-overview",
+      "version-2.2.0-concepts-clients",
+      "version-2.2.0-concepts-replication",
+      "version-2.2.0-concepts-multi-tenancy",
+      "version-2.2.0-concepts-authentication",
+      "version-2.2.0-concepts-topic-compaction",
+      "version-2.2.0-concepts-tiered-storage",
+      "version-2.2.0-concepts-schema-registry"
+    ],
+    "Pulsar Functions": [
+      "version-2.2.0-functions-overview",
+      "version-2.2.0-functions-quickstart",
+      "version-2.2.0-functions-api",
+      "version-2.2.0-functions-deploying",
+      "version-2.2.0-functions-guarantees",
+      "version-2.2.0-functions-state",
+      "version-2.2.0-functions-metrics"
+    ],
+    "Pulsar IO": [
+      "version-2.2.0-io-overview",
+      "version-2.2.0-io-quickstart",
+      "version-2.2.0-io-managing",
+      "version-2.2.0-io-connectors",
+      "version-2.2.0-io-develop"
+    ],
+    "Pulsar SQL": [
+      "version-2.2.0-sql-overview",
+      "version-2.2.0-sql-getting-started",
+      "version-2.2.0-sql-deployment-configurations"
+    ],
+    "Deployment": [
+      "version-2.2.0-deploy-aws",
+      "version-2.2.0-deploy-kubernetes",
+      "version-2.2.0-deploy-bare-metal",
+      "version-2.2.0-deploy-bare-metal-multi-cluster",
+      "version-2.2.0-deploy-dcos",
+      "version-2.2.0-deploy-monitoring"
+    ],
+    "Administration": [
+      "version-2.2.0-administration-zk-bk",
+      "version-2.2.0-administration-geo",
+      "version-2.2.0-administration-dashboard",
+      "version-2.2.0-administration-stats",
+      "version-2.2.0-administration-load-distribution",
+      "version-2.2.0-administration-proxy"
+    ],
+    "Security": [
+      "version-2.2.0-security-overview",
+      "version-2.2.0-security-tls-transport",
+      "version-2.2.0-security-tls-authentication",
+      "version-2.2.0-security-athenz",
+      "version-2.2.0-security-authorization",
+      "version-2.2.0-security-encryption",
+      "version-2.2.0-security-extending"
+    ],
+    "Client libraries": [
+      "version-2.2.0-client-libraries-java",
+      "version-2.2.0-client-libraries-go",
+      "version-2.2.0-client-libraries-python",
+      "version-2.2.0-client-libraries-cpp",
+      "version-2.2.0-client-libraries-websocket"
+    ],
+    "Admin API": [
+      "version-2.2.0-admin-api-overview",
+      "version-2.2.0-admin-api-clusters",
+      "version-2.2.0-admin-api-tenants",
+      "version-2.2.0-admin-api-brokers",
+      "version-2.2.0-admin-api-namespaces",
+      "version-2.2.0-admin-api-permissions",
+      "version-2.2.0-admin-api-persistent-topics",
+      "version-2.2.0-admin-api-non-persistent-topics",
+      "version-2.2.0-admin-api-partitioned-topics",
+      "version-2.2.0-admin-api-schemas"
+    ],
+    "Adaptors": [
+      "version-2.2.0-adaptors-kafka",
+      "version-2.2.0-adaptors-spark",
+      "version-2.2.0-adaptors-storm"
+    ],
+    "Cookbooks": [
+      "version-2.2.0-cookbooks-tiered-storage",
+      "version-2.2.0-cookbooks-compaction",
+      "version-2.2.0-cookbooks-deduplication",
+      "version-2.2.0-cookbooks-non-persistent",
+      "version-2.2.0-cookbooks-partitioned",
+      "version-2.2.0-cookbooks-retention-expiry",
+      "version-2.2.0-cookbooks-encryption",
+      "version-2.2.0-cookbooks-message-queue"
+    ],
+    "Development": [
+      "version-2.2.0-develop-tools",
+      "version-2.2.0-develop-binary-protocol",
+      "version-2.2.0-develop-schema",
+      "version-2.2.0-develop-load-manager",
+      "version-2.2.0-develop-cpp"
+    ],
+    "Reference": [
+      "version-2.2.0-reference-terminology",
+      "version-2.2.0-reference-cli-tools",
+      "version-2.2.0-pulsar-admin",
+      "version-2.2.0-reference-configuration"
+    ]
+  },
+  "version-2.2.0-docs-other": {
+    "First Category": [
+      "version-2.2.0-doc4",
+      "version-2.2.0-doc5"
+    ]
+  }
+}
diff --git a/site2/website/versions.json b/site2/website/versions.json
index e05d087c05..0ec7a518a2 100644
--- a/site2/website/versions.json
+++ b/site2/website/versions.json
@@ -1,4 +1,5 @@
 [
+  "2.2.1",
   "2.2.0",
   "2.1.1-incubating",
   "2.1.0-incubating"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message