kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [kafka] branch trunk updated: KAFKA-4029: SSL support for Connect REST API (KIP-208)
Date Tue, 30 Jan 2018 23:09:51 GMT
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3431be2  KAFKA-4029: SSL support for Connect REST API (KIP-208)
3431be2 is described below

commit 3431be2aeb4f16599c419f89360856b72d0d335d
Author: Jakub Scholz <www@scholzj.com>
AuthorDate: Tue Jan 30 15:09:40 2018 -0800

    KAFKA-4029: SSL support for Connect REST API (KIP-208)
    
    This PR implements the JIRA issue [KAFKA-4029: SSL support for Connect REST API](https://issues.apache.org/jira/browse/KAFKA-4029) / [KIP-208](https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface).
    
    Summary of the main changes:
    - Jetty `HttpClient` is used as HTTP client instead of the one shipped with Java. That allows to keep the SSL configuration for Server and Client be in single place (both use the Jetty `SslContextFactory`). It also has much richer configuration than the JDK client (it is easier to configure things such as supported cipher suites etc.).
    - The `RestServer` class has been broker into 3 parts. `RestServer` contains the server it self. `RestClient` contains the HTTP client used for forwarding requests etc. and `SSLUtils` contain some helper classes for configuring SSL. One of the reasons for this was Findbugs complaining about the class complexity.
    - A new method `valuesWithPrefixAllOrNothing` has been added to `AbstractConfig` to make it easier to handle the situation that we want to use either only the prefixed SSL options or only the non-prefixed. But not mixed them.
    
    Author: Jakub Scholz <www@scholzj.com>
    
    Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
    
    Closes #4429 from scholzj/kip-208
---
 build.gradle                                       |   1 +
 .../apache/kafka/common/config/AbstractConfig.java |  25 +++
 .../kafka/common/config/AbstractConfigTest.java    |  28 +++
 .../apache/kafka/connect/runtime/WorkerConfig.java |  25 ++-
 .../runtime/distributed/DistributedHerder.java     |   7 +-
 .../kafka/connect/runtime/rest/RestClient.java     | 158 +++++++++++++++
 .../kafka/connect/runtime/rest/RestServer.java     | 224 +++++++++++----------
 .../runtime/rest/resources/ConnectorsResource.java |  15 +-
 .../kafka/connect/runtime/rest/util/SSLUtils.java  | 152 ++++++++++++++
 .../kafka/connect/runtime/rest/RestServerTest.java | 100 +++++++--
 .../resources/ConnectorPluginsResourceTest.java    |   9 +-
 .../rest/resources/ConnectorsResourceTest.java     |  49 ++---
 .../connect/runtime/rest/util/SSLUtilsTest.java    | 126 ++++++++++++
 docs/connect.html                                  |  38 +++-
 gradle/dependencies.gradle                         |   1 +
 15 files changed, 798 insertions(+), 160 deletions(-)

diff --git a/build.gradle b/build.gradle
index aedd943..430d989 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1201,6 +1201,7 @@ project(':connect:runtime') {
     compile libs.jettyServer
     compile libs.jettyServlet
     compile libs.jettyServlets
+    compile libs.jettyClient
     compile(libs.reflections)
     compile(libs.mavenArtifact)
 
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 427c492..ce3ae43 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -218,6 +218,31 @@ public class AbstractConfig {
         return result;
     }
 
+    /**
+     * If at least one key with {@code prefix} exists, all prefixed values will be parsed and put into map.
+     * If no value with {@code prefix} exists all unprefixed values will be returned.
+     *
+     * This is useful if one wants to allow prefixed configs to override default ones, but wants to use either
+     * only prefixed configs or only regular configs, but not mix them.
+     */
+    public Map<String, Object> valuesWithPrefixAllOrNothing(String prefix) {
+        Map<String, Object> withPrefix = originalsWithPrefix(prefix, true);
+
+        if (withPrefix.isEmpty()) {
+            return new RecordingMap<>(values(), "", true);
+        } else {
+            Map<String, Object> result = new RecordingMap<>(prefix, true);
+
+            for (Map.Entry<String, ?> entry : withPrefix.entrySet()) {
+                ConfigDef.ConfigKey configKey = definition.configKeys().get(entry.getKey());
+                if (configKey != null)
+                    result.put(entry.getKey(), definition.parseValue(configKey, entry.getValue(), true));
+            }
+
+            return result;
+        }
+    }
+
     public Map<String, ?> values() {
         return new RecordingMap<>(values);
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index 2e15715..074df10 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -140,6 +140,34 @@ public class AbstractConfigTest {
     }
 
     @Test
+    public void testValuesWithPrefixAllOrNothing() {
+        String prefix1 = "prefix1.";
+        String prefix2 = "prefix2.";
+        Properties props = new Properties();
+        props.put("sasl.mechanism", "PLAIN");
+        props.put("prefix1.sasl.mechanism", "GSSAPI");
+        props.put("prefix1.sasl.kerberos.kinit.cmd", "/usr/bin/kinit2");
+        props.put("prefix1.ssl.truststore.location", "my location");
+        props.put("sasl.kerberos.service.name", "service name");
+        props.put("ssl.keymanager.algorithm", "algorithm");
+        TestSecurityConfig config = new TestSecurityConfig(props);
+        Map<String, Object> valuesWithPrefixAllOrNothing1 = config.valuesWithPrefixAllOrNothing(prefix1);
+
+        // All prefixed values are there
+        assertEquals("GSSAPI", valuesWithPrefixAllOrNothing1.get("sasl.mechanism"));
+        assertEquals("/usr/bin/kinit2", valuesWithPrefixAllOrNothing1.get("sasl.kerberos.kinit.cmd"));
+        assertEquals("my location", valuesWithPrefixAllOrNothing1.get("ssl.truststore.location"));
+
+        // Non-prefixed values are missing
+        assertFalse(valuesWithPrefixAllOrNothing1.containsKey("sasl.kerberos.service.name"));
+        assertFalse(valuesWithPrefixAllOrNothing1.containsKey("ssl.keymanager.algorithm"));
+
+        Map<String, Object> valuesWithPrefixAllOrNothing2 = config.valuesWithPrefixAllOrNothing(prefix2);
+        assertTrue(valuesWithPrefixAllOrNothing2.containsKey("sasl.kerberos.service.name"));
+        assertTrue(valuesWithPrefixAllOrNothing2.containsKey("ssl.keymanager.algorithm"));
+    }
+
+    @Test
     public void testUnused() {
         Properties props = new Properties();
         String configValue = "org.apache.kafka.common.config.AbstractConfigTest$ConfiguredFakeMetricsReporter";
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index dfae761..3b99c6b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.metrics.Sensor;
 
 import java.util.ArrayList;
@@ -99,15 +100,30 @@ public class WorkerConfig extends AbstractConfig {
             + "data to be committed in a future attempt.";
     public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L;
 
+    /**
+     * @deprecated As of 1.1.0.
+     */
+    @Deprecated
     public static final String REST_HOST_NAME_CONFIG = "rest.host.name";
     private static final String REST_HOST_NAME_DOC
             = "Hostname for the REST API. If this is set, it will only bind to this interface.";
 
+    /**
+     * @deprecated As of 1.1.0.
+     */
+    @Deprecated
     public static final String REST_PORT_CONFIG = "rest.port";
     private static final String REST_PORT_DOC
             = "Port for the REST API to listen on.";
     public static final int REST_PORT_DEFAULT = 8083;
 
+    public static final String LISTENERS_CONFIG = "listeners";
+    private static final String LISTENERS_DOC
+            = "List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.\n" +
+            " Specify hostname as 0.0.0.0 to bind to all interfaces.\n" +
+            " Leave hostname empty to bind to default interface.\n" +
+            " Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084";
+
     public static final String REST_ADVERTISED_HOST_NAME_CONFIG = "rest.advertised.host.name";
     private static final String REST_ADVERTISED_HOST_NAME_DOC
             = "If this is set, this is the hostname that will be given out to other workers to connect to.";
@@ -116,6 +132,10 @@ public class WorkerConfig extends AbstractConfig {
     private static final String REST_ADVERTISED_PORT_DOC
             = "If this is set, this is the port that will be given out to other workers to connect to.";
 
+    public static final String REST_ADVERTISED_LISTENER_CONFIG = "rest.advertised.listener";
+    private static final String REST_ADVERTISED_LISTENER_DOC
+            = "Sets the advertised listener (HTTP or HTTPS) which will be given to other workers to use.";
+
     public static final String ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG = "access.control.allow.origin";
     protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DOC =
             "Value to set the Access-Control-Allow-Origin header to for REST API requests." +
@@ -173,8 +193,10 @@ public class WorkerConfig extends AbstractConfig {
                         Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC)
                 .define(REST_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_HOST_NAME_DOC)
                 .define(REST_PORT_CONFIG, Type.INT, REST_PORT_DEFAULT, Importance.LOW, REST_PORT_DOC)
+                .define(LISTENERS_CONFIG, Type.LIST, null, Importance.LOW, LISTENERS_DOC)
                 .define(REST_ADVERTISED_HOST_NAME_CONFIG, Type.STRING,  null, Importance.LOW, REST_ADVERTISED_HOST_NAME_DOC)
                 .define(REST_ADVERTISED_PORT_CONFIG, Type.INT,  null, Importance.LOW, REST_ADVERTISED_PORT_DOC)
+                .define(REST_ADVERTISED_LISTENER_CONFIG, Type.STRING,  null, Importance.LOW, REST_ADVERTISED_LISTENER_DOC)
                 .define(ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, Type.STRING,
                         ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT, Importance.LOW,
                         ACCESS_CONTROL_ALLOW_ORIGIN_DOC)
@@ -199,7 +221,8 @@ public class WorkerConfig extends AbstractConfig {
                         CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
                 .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST,
                         "", Importance.LOW,
-                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC);
+                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+                .define(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, ConfigDef.Type.STRING, "none", ConfigDef.Importance.LOW, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC);
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index c39ee67..c6e6a47 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -42,6 +42,7 @@ import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
@@ -146,6 +147,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     private boolean needsReconfigRebalance;
     private volatile int generation;
 
+    private final DistributedConfig config;
+
     public DistributedHerder(DistributedConfig config,
                              Time time,
                              Worker worker,
@@ -186,6 +189,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                 });
         this.forwardRequestExecutor = Executors.newSingleThreadExecutor();
         this.startAndStopExecutor = Executors.newFixedThreadPool(START_STOP_THREAD_POOL_SIZE);
+        this.config = config;
 
         stopping = new AtomicBoolean(false);
         configState = ClusterConfigState.EMPTY;
@@ -710,7 +714,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         return generation;
     }
 
-
     // Should only be called from work thread, so synchronization should not be needed
     private boolean isLeader() {
         return assignment != null && member.memberId().equals(assignment.leader());
@@ -1011,7 +1014,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                         public void run() {
                             try {
                                 String reconfigUrl = RestServer.urlJoin(leaderUrl(), "/connectors/" + connName + "/tasks");
-                                RestServer.httpRequest(reconfigUrl, "POST", taskProps, null);
+                                RestClient.httpRequest(reconfigUrl, "POST", taskProps, null, config);
                                 cb.onCompletion(null, null);
                             } catch (ConnectException e) {
                                 log.error("Request to leader to reconfigure connector tasks failed", e);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
new file mode 100644
index 0000000..d500ad2
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.util.StringContentProvider;
+import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+public class RestClient {
+    private static final Logger log = LoggerFactory.getLogger(RestClient.class);
+    private static final ObjectMapper JSON_SERDE = new ObjectMapper();
+
+    /**
+     * Sends HTTP request to remote REST server
+     *
+     * @param url             HTTP connection will be established with this url.
+     * @param method          HTTP method ("GET", "POST", "PUT", etc.)
+     * @param requestBodyData Object to serialize as JSON and send in the request body.
+     * @param responseFormat  Expected format of the response to the HTTP request.
+     * @param <T>             The type of the deserialized response to the HTTP request.
+     * @return The deserialized response to the HTTP request, or null if no data is expected.
+     */
+    public static <T> HttpResponse<T> httpRequest(String url, String method, Object requestBodyData,
+                                                  TypeReference<T> responseFormat, WorkerConfig config) {
+        HttpClient client;
+
+        if (url.startsWith("https://")) {
+            client = new HttpClient(SSLUtils.createSslContextFactory(config, true));
+        } else {
+            client = new HttpClient();
+        }
+
+        client.setFollowRedirects(false);
+
+        try {
+            client.start();
+        } catch (Exception e) {
+            log.error("Failed to start RestClient: ", e);
+            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "Failed to start RestClient: " + e.getMessage(), e);
+        }
+
+        try {
+            String serializedBody = requestBodyData == null ? null : JSON_SERDE.writeValueAsString(requestBodyData);
+            log.trace("Sending {} with input {} to {}", method, serializedBody, url);
+
+            Request req = client.newRequest(url);
+            req.method(method);
+            req.accept("application/json");
+            req.agent("kafka-connect");
+            req.content(new StringContentProvider(serializedBody, StandardCharsets.UTF_8), "application/json");
+
+            ContentResponse res = req.send();
+
+            int responseCode = res.getStatus();
+            System.out.println(responseCode);
+            if (responseCode == HttpStatus.NO_CONTENT_204) {
+                return new HttpResponse<>(responseCode, convertHttpFieldsToMap(res.getHeaders()), null);
+            } else if (responseCode >= 400) {
+                ErrorMessage errorMessage = JSON_SERDE.readValue(res.getContentAsString(), ErrorMessage.class);
+                throw new ConnectRestException(responseCode, errorMessage.errorCode(), errorMessage.message());
+            } else if (responseCode >= 200 && responseCode < 300) {
+                T result = JSON_SERDE.readValue(res.getContentAsString(), responseFormat);
+                return new HttpResponse<>(responseCode, convertHttpFieldsToMap(res.getHeaders()), result);
+            } else {
+                throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR,
+                        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+                        "Unexpected status code when handling forwarded request: " + responseCode);
+            }
+        } catch (IOException | InterruptedException | TimeoutException | ExecutionException e) {
+            log.error("IO error forwarding REST request: ", e);
+            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), e);
+        } finally {
+            if (client != null)
+                try {
+                    client.stop();
+                } catch (Exception e) {
+                    log.error("Failed to stop HTTP client", e);
+                }
+        }
+    }
+
+    /**
+     * Convert response parameters from Jetty format (HttpFields)
+     * @param httpFields
+     * @return
+     */
+    private static Map<String, String> convertHttpFieldsToMap(HttpFields httpFields) {
+        Map<String, String> headers = new HashMap<String, String>();
+
+        if (httpFields == null || httpFields.size() == 0)
+            return headers;
+
+        for (HttpField field : httpFields) {
+            headers.put(field.getName(), field.getValue());
+        }
+
+        return headers;
+    }
+
+    public static class HttpResponse<T> {
+        private int status;
+        private Map<String, String> headers;
+        private T body;
+
+        public HttpResponse(int status, Map<String, String> headers, T body) {
+            this.status = status;
+            this.headers = headers;
+            this.body = body;
+        }
+
+        public int status() {
+            return status;
+        }
+
+        public Map<String, String> headers() {
+            return headers;
+        }
+
+        public T body() {
+            return body;
+        }
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 4a2f913..77f6cdf 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -16,19 +16,16 @@
  */
 package org.apache.kafka.connect.runtime.rest;
 
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
-import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
 import org.apache.kafka.connect.runtime.rest.resources.RootResource;
+import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
@@ -42,25 +39,22 @@ import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.servlets.CrossOriginFilter;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.servlet.ServletContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
+import javax.servlet.DispatcherType;
+import javax.ws.rs.core.UriBuilder;
 import java.net.URI;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.Map;
-
-import javax.servlet.DispatcherType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriBuilder;
+import java.util.Locale;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * Embedded server for the REST API that provides the control plane for Kafka Connect workers.
@@ -70,7 +64,8 @@ public class RestServer {
 
     private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 60 * 1000;
 
-    private static final ObjectMapper JSON_SERDE = new ObjectMapper();
+    private static final String PROTOCOL_HTTP = "http";
+    private static final String PROTOCOL_HTTPS = "https";
 
     private final WorkerConfig config;
     private Server jettyServer;
@@ -81,17 +76,79 @@ public class RestServer {
     public RestServer(WorkerConfig config) {
         this.config = config;
 
-        // To make the advertised port available immediately, we need to do some configuration here
-        String hostname = config.getString(WorkerConfig.REST_HOST_NAME_CONFIG);
-        Integer port = config.getInt(WorkerConfig.REST_PORT_CONFIG);
+        List<String> listeners = parseListeners();
 
         jettyServer = new Server();
 
-        ServerConnector connector = new ServerConnector(jettyServer);
-        if (hostname != null && !hostname.isEmpty())
+        createConnectors(listeners);
+    }
+
+    List<String> parseListeners() {
+        List<String> listeners = config.getList(WorkerConfig.LISTENERS_CONFIG);
+        if (listeners == null || listeners.size() == 0) {
+            String hostname = config.getString(WorkerConfig.REST_HOST_NAME_CONFIG);
+
+            if (hostname == null)
+                hostname = "";
+
+            listeners = Collections.singletonList(String.format("%s://%s:%d", PROTOCOL_HTTP, hostname, config.getInt(WorkerConfig.REST_PORT_CONFIG)));
+        }
+
+        return listeners;
+    }
+
+    /**
+     * Adds Jetty connector for each configured listener
+     */
+    public void createConnectors(List<String> listeners) {
+        List<Connector> connectors = new ArrayList<>();
+
+        for (String listener : listeners) {
+            if (!listener.isEmpty()) {
+                Connector connector = createConnector(listener);
+                connectors.add(connector);
+                log.info("Added connector for " + listener);
+            }
+        }
+
+        jettyServer.setConnectors(connectors.toArray(new Connector[connectors.size()]));
+    }
+
+    /**
+     * Creates Jetty connector according to configuration
+     */
+    public Connector createConnector(String listener) {
+        Pattern listenerPattern = Pattern.compile("^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)");
+        Matcher listenerMatcher = listenerPattern.matcher(listener);
+
+        if (!listenerMatcher.matches())
+            throw new ConfigException("Listener doesn't have the right format (protocol://hostname:port).");
+
+        String protocol = listenerMatcher.group(1).toLowerCase(Locale.ENGLISH);
+
+        if (!PROTOCOL_HTTP.equals(protocol) && !PROTOCOL_HTTPS.equals(protocol))
+            throw new ConfigException(String.format("Listener protocol must be either \"%s\" or \"%s\".", PROTOCOL_HTTP, PROTOCOL_HTTPS));
+
+        String hostname = listenerMatcher.group(2);
+        int port = Integer.parseInt(listenerMatcher.group(3));
+
+        ServerConnector connector;
+
+        if (PROTOCOL_HTTPS.equals(protocol)) {
+            SslContextFactory ssl = SSLUtils.createSslContextFactory(config);
+            connector = new ServerConnector(jettyServer, ssl);
+            connector.setName(String.format("%s_%s%d", PROTOCOL_HTTPS, hostname, port));
+        } else {
+            connector = new ServerConnector(jettyServer);
+            connector.setName(String.format("%s_%s%d", PROTOCOL_HTTP, hostname, port));
+        }
+
+        if (!hostname.isEmpty())
             connector.setHost(hostname);
+
         connector.setPort(port);
-        jettyServer.setConnectors(new Connector[]{connector});
+
+        return connector;
     }
 
     public void start(Herder herder) {
@@ -101,7 +158,7 @@ public class RestServer {
         resourceConfig.register(new JacksonJsonProvider());
 
         resourceConfig.register(new RootResource(herder));
-        resourceConfig.register(new ConnectorsResource(herder));
+        resourceConfig.register(new ConnectorsResource(herder, config));
         resourceConfig.register(new ConnectorPluginsResource(herder));
 
         resourceConfig.register(ConnectExceptionMapper.class);
@@ -171,103 +228,56 @@ public class RestServer {
      */
     public URI advertisedUrl() {
         UriBuilder builder = UriBuilder.fromUri(jettyServer.getURI());
+
+        String advertisedSecurityProtocol = determineAdvertisedProtocol();
+        ServerConnector serverConnector = findConnector(advertisedSecurityProtocol);
+        builder.scheme(advertisedSecurityProtocol);
+
         String advertisedHostname = config.getString(WorkerConfig.REST_ADVERTISED_HOST_NAME_CONFIG);
         if (advertisedHostname != null && !advertisedHostname.isEmpty())
             builder.host(advertisedHostname);
+        else if (serverConnector != null && serverConnector.getHost() != null && serverConnector.getHost().length() > 0)
+            builder.host(serverConnector.getHost());
+
         Integer advertisedPort = config.getInt(WorkerConfig.REST_ADVERTISED_PORT_CONFIG);
         if (advertisedPort != null)
             builder.port(advertisedPort);
-        else
-            builder.port(config.getInt(WorkerConfig.REST_PORT_CONFIG));
-        return builder.build();
-    }
-
-    /**
-     * @param url               HTTP connection will be established with this url.
-     * @param method            HTTP method ("GET", "POST", "PUT", etc.)
-     * @param requestBodyData   Object to serialize as JSON and send in the request body.
-     * @param responseFormat    Expected format of the response to the HTTP request.
-     * @param <T>               The type of the deserialized response to the HTTP request.
-     * @return The deserialized response to the HTTP request, or null if no data is expected.
-     */
-    public static <T> HttpResponse<T> httpRequest(String url, String method, Object requestBodyData,
-                                    TypeReference<T> responseFormat) {
-        HttpURLConnection connection = null;
-        try {
-            String serializedBody = requestBodyData == null ? null : JSON_SERDE.writeValueAsString(requestBodyData);
-            log.debug("Sending {} with input {} to {}", method, serializedBody, url);
-
-            connection = (HttpURLConnection) new URL(url).openConnection();
-            connection.setRequestMethod(method);
+        else if (serverConnector != null)
+            builder.port(serverConnector.getPort());
 
-            connection.setRequestProperty("User-Agent", "kafka-connect");
-            connection.setRequestProperty("Accept", "application/json");
+        log.info("Advertised URI: {}", builder.build());
 
-            // connection.getResponseCode() implicitly calls getInputStream, so always set to true.
-            // On the other hand, leaving this out breaks nothing.
-            connection.setDoInput(true);
-
-            connection.setUseCaches(false);
-
-            if (requestBodyData != null) {
-                connection.setRequestProperty("Content-Type", "application/json");
-                connection.setDoOutput(true);
-
-                OutputStream os = connection.getOutputStream();
-                os.write(serializedBody.getBytes(StandardCharsets.UTF_8));
-                os.flush();
-                os.close();
-            }
-
-            int responseCode = connection.getResponseCode();
-            if (responseCode == HttpURLConnection.HTTP_NO_CONTENT) {
-                return new HttpResponse<>(responseCode, connection.getHeaderFields(), null);
-            } else if (responseCode >= 400) {
-                InputStream es = connection.getErrorStream();
-                ErrorMessage errorMessage = JSON_SERDE.readValue(es, ErrorMessage.class);
-                es.close();
-                throw new ConnectRestException(responseCode, errorMessage.errorCode(), errorMessage.message());
-            } else if (responseCode >= 200 && responseCode < 300) {
-                InputStream is = connection.getInputStream();
-                T result = JSON_SERDE.readValue(is, responseFormat);
-                is.close();
-                return new HttpResponse<>(responseCode, connection.getHeaderFields(), result);
-            } else {
-                throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR,
-                        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
-                        "Unexpected status code when handling forwarded request: " + responseCode);
-            }
-        } catch (IOException e) {
-            log.error("IO error forwarding REST request: ", e);
-            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), e);
-        } finally {
-            if (connection != null)
-                connection.disconnect();
-        }
+        return builder.build();
     }
 
-    public static class HttpResponse<T> {
-        private int status;
-        private Map<String, List<String>> headers;
-        private T body;
-
-        public HttpResponse(int status, Map<String, List<String>> headers, T body) {
-            this.status = status;
-            this.headers = headers;
-            this.body = body;
-        }
-
-        public int status() {
-            return status;
+    String determineAdvertisedProtocol() {
+        String advertisedSecurityProtocol = config.getString(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG);
+        if (advertisedSecurityProtocol == null) {
+            String listeners = (String) config.originals().get(WorkerConfig.LISTENERS_CONFIG);
+
+            if (listeners == null)
+                return PROTOCOL_HTTP;
+            else
+                listeners = listeners.toLowerCase(Locale.ENGLISH);
+
+            if (listeners.contains(String.format("%s://", PROTOCOL_HTTP)))
+                return PROTOCOL_HTTP;
+            else if (listeners.contains(String.format("%s://", PROTOCOL_HTTPS)))
+                return PROTOCOL_HTTPS;
+            else
+                return PROTOCOL_HTTP;
+        } else {
+            return advertisedSecurityProtocol.toLowerCase(Locale.ENGLISH);
         }
+    }
 
-        public Map<String, List<String>> headers() {
-            return headers;
+    ServerConnector findConnector(String protocol) {
+        for (Connector connector : jettyServer.getConnectors()) {
+            if (connector.getName().startsWith(protocol))
+                return (ServerConnector) connector;
         }
 
-        public T body() {
-            return body;
-        }
+        return null;
     }
 
     public static String urlJoin(String base, String path) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 2c03124..7a01168 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -19,9 +19,10 @@ package org.apache.kafka.connect.runtime.rest.resources;
 import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
 import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
-import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
@@ -67,11 +68,13 @@ public class ConnectorsResource {
     private static final long REQUEST_TIMEOUT_MS = 90 * 1000;
 
     private final Herder herder;
+    private final WorkerConfig config;
     @javax.ws.rs.core.Context
     private ServletContext context;
 
-    public ConnectorsResource(Herder herder) {
+    public ConnectorsResource(Herder herder, WorkerConfig config) {
         this.herder = herder;
+        this.config = config;
     }
 
     @GET
@@ -257,7 +260,7 @@ public class ConnectorsResource {
                             .build()
                             .toString();
                     log.debug("Forwarding request {} {} {}", forwardUrl, method, body);
-                    return translator.translate(RestServer.httpRequest(forwardUrl, method, body, resultType));
+                    return translator.translate(RestClient.httpRequest(forwardUrl, method, body, resultType, config));
                 } else {
                     // we should find the right target for the query within two hops, so if
                     // we don't, it probably means that a rebalance has taken place.
@@ -290,19 +293,19 @@ public class ConnectorsResource {
     }
 
     private interface Translator<T, U> {
-        T translate(RestServer.HttpResponse<U> response);
+        T translate(RestClient.HttpResponse<U> response);
     }
 
     private static class IdentityTranslator<T> implements Translator<T, T> {
         @Override
-        public T translate(RestServer.HttpResponse<T> response) {
+        public T translate(RestClient.HttpResponse<T> response) {
             return response.body();
         }
     }
 
     private static class CreatedConnectorInfoTranslator implements Translator<Herder.Created<ConnectorInfo>, ConnectorInfo> {
         @Override
-        public Herder.Created<ConnectorInfo> translate(RestServer.HttpResponse<ConnectorInfo> response) {
+        public Herder.Created<ConnectorInfo> translate(RestClient.HttpResponse<ConnectorInfo> response) {
             boolean created = response.status() == 201;
             return new Herder.Created<>(created, response.body());
         }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java
new file mode 100644
index 0000000..51222e5
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.util;
+
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper class for setting up SSL for RestServer and RestClient
+ */
+public class SSLUtils {
+    /**
+     * Configures SSL/TLS for HTTPS Jetty Server / Client
+     */
+    public static SslContextFactory createSslContextFactory(WorkerConfig config) {
+        return createSslContextFactory(config, false);
+    }
+
+    /**
+     * Configures SSL/TLS for HTTPS Jetty Server / Client
+     */
+    public static SslContextFactory createSslContextFactory(WorkerConfig config, boolean client) {
+        Map<String, Object> sslConfigValues = config.valuesWithPrefixAllOrNothing("listeners.https.");
+
+        SslContextFactory ssl = new SslContextFactory();
+
+        configureSslContextFactoryKeyStore(ssl, sslConfigValues);
+        configureSslContextFactoryTrustStore(ssl, sslConfigValues);
+        configureSslContextFactoryAlgorithms(ssl, sslConfigValues);
+        configureSslContextFactoryAuthentication(ssl, sslConfigValues);
+
+        if (client)
+            configureSslContextFactoryEndpointIdentification(ssl, sslConfigValues);
+
+        return ssl;
+    }
+
+    /**
+     * Configures KeyStore related settings in SslContextFactory
+     */
+    protected static void configureSslContextFactoryKeyStore(SslContextFactory ssl, Map<String, Object> sslConfigValues) {
+        ssl.setKeyStoreType((String) getOrDefault(sslConfigValues, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE));
+
+        String sslKeystoreLocation = (String) sslConfigValues.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
+        if (sslKeystoreLocation != null)
+            ssl.setKeyStorePath(sslKeystoreLocation);
+
+        Password sslKeystorePassword = (Password) sslConfigValues.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
+        if (sslKeystorePassword != null)
+            ssl.setKeyStorePassword(sslKeystorePassword.value());
+
+        Password sslKeyPassword = (Password) sslConfigValues.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
+        if (sslKeyPassword != null)
+            ssl.setKeyManagerPassword(sslKeyPassword.value());
+    }
+
+    protected static Object getOrDefault(Map<String, Object> configMap, String key, Object defaultValue) {
+        if (configMap.containsKey(key))
+            return configMap.get(key);
+
+        return defaultValue;
+    }
+
+    /**
+     * Configures TrustStore related settings in SslContextFactory
+     */
+    protected static void configureSslContextFactoryTrustStore(SslContextFactory ssl, Map<String, Object> sslConfigValues) {
+        ssl.setTrustStoreType((String) getOrDefault(sslConfigValues, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE));
+
+        String sslTruststoreLocation = (String) sslConfigValues.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
+        if (sslTruststoreLocation != null)
+            ssl.setTrustStorePath(sslTruststoreLocation);
+
+        Password sslTruststorePassword = (Password) sslConfigValues.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
+        if (sslTruststorePassword != null)
+            ssl.setTrustStorePassword(sslTruststorePassword.value());
+    }
+
+    /**
+     * Configures Protocol, Algorithm and Provider related settings in SslContextFactory
+     */
+    protected static void configureSslContextFactoryAlgorithms(SslContextFactory ssl, Map<String, Object> sslConfigValues) {
+        List<String> sslEnabledProtocols = (List<String>) getOrDefault(sslConfigValues, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS.split("\\s*,\\s*")));
+        ssl.setIncludeProtocols(sslEnabledProtocols.toArray(new String[sslEnabledProtocols.size()]));
+
+        String sslProvider = (String) sslConfigValues.get(SslConfigs.SSL_PROVIDER_CONFIG);
+        if (sslProvider != null)
+            ssl.setProvider(sslProvider);
+
+        ssl.setProtocol((String) getOrDefault(sslConfigValues, SslConfigs.SSL_PROTOCOL_CONFIG, SslConfigs.DEFAULT_SSL_PROTOCOL));
+
+        List<String> sslCipherSuites = (List<String>) sslConfigValues.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG);
+        if (sslCipherSuites != null)
+            ssl.setIncludeCipherSuites(sslCipherSuites.toArray(new String[sslCipherSuites.size()]));
+
+        ssl.setSslKeyManagerFactoryAlgorithm((String) getOrDefault(sslConfigValues, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM));
+
+        String sslSecureRandomImpl = (String) sslConfigValues.get(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG);
+        if (sslSecureRandomImpl != null)
+            ssl.setSecureRandomAlgorithm(sslSecureRandomImpl);
+
+        ssl.setTrustManagerFactoryAlgorithm((String) getOrDefault(sslConfigValues, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM));
+    }
+
+    /**
+     * Configures Protocol, Algorithm and Provider related settings in SslContextFactory
+     */
+    protected static void configureSslContextFactoryEndpointIdentification(SslContextFactory ssl, Map<String, Object> sslConfigValues) {
+        String sslEndpointIdentificationAlg = (String) sslConfigValues.get(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
+        if (sslEndpointIdentificationAlg != null)
+            ssl.setEndpointIdentificationAlgorithm(sslEndpointIdentificationAlg);
+    }
+
+    /**
+     * Configures Authentication related settings in SslContextFactory
+     */
+    protected static void configureSslContextFactoryAuthentication(SslContextFactory ssl, Map<String, Object> sslConfigValues) {
+        String sslClientAuth = (String) getOrDefault(sslConfigValues, BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
+        switch (sslClientAuth) {
+            case "requested":
+                ssl.setWantClientAuth(true);
+                break;
+            case "required":
+                ssl.setNeedClientAuth(true);
+                break;
+            default:
+                ssl.setNeedClientAuth(false);
+                ssl.setWantClientAuth(false);
+        }
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index 8c53372..d26aa04 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -16,20 +16,27 @@
  */
 package org.apache.kafka.connect.runtime.rest;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
 import org.powermock.api.easymock.annotation.MockStrict;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
@@ -37,12 +44,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Invocation;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.Response;
-
 import static org.junit.Assert.assertEquals;
 
 @RunWith(PowerMockRunner.class)
@@ -59,13 +60,16 @@ public class RestServerTest {
 
     private Map<String, String> baseWorkerProps() {
         Map<String, String> workerProps = new HashMap<>();
-        workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
-        workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
-        workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
-        workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
-        workerProps.put("internal.key.converter.schemas.enable", "false");
-        workerProps.put("internal.value.converter.schemas.enable", "false");
-        workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        workerProps.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
+        workerProps.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "config-topic");
+        workerProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        workerProps.put(DistributedConfig.GROUP_ID_CONFIG, "connect-test-group");
+        workerProps.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
+
         return workerProps;
     }
 
@@ -79,6 +83,72 @@ public class RestServerTest {
         checkCORSRequest("", "http://bar.com", null, null);
     }
 
+    @Test
+    public void testParseListeners() {
+        // Use listeners field
+        Map<String, String> configMap = new HashMap<>(baseWorkerProps());
+        configMap.put(WorkerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443");
+        DistributedConfig config = new DistributedConfig(configMap);
+
+        server = new RestServer(config);
+        Assert.assertArrayEquals(new String[] {"http://localhost:8080", "https://localhost:8443"}, server.parseListeners().toArray());
+
+        // Build listener from hostname and port
+        configMap = new HashMap<>(baseWorkerProps());
+        configMap.put(WorkerConfig.REST_HOST_NAME_CONFIG, "my-hostname");
+        configMap.put(WorkerConfig.REST_PORT_CONFIG, "8080");
+        config = new DistributedConfig(configMap);
+        server = new RestServer(config);
+        Assert.assertArrayEquals(new String[] {"http://my-hostname:8080"}, server.parseListeners().toArray());
+    }
+
+    @Test
+    public void testAdvertisedUri() {
+        // Advertised URI from listeenrs without protocol
+        Map<String, String> configMap = new HashMap<>(baseWorkerProps());
+        configMap.put(WorkerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443");
+        DistributedConfig config = new DistributedConfig(configMap);
+
+        server = new RestServer(config);
+        Assert.assertEquals("http://localhost:8080/", server.advertisedUrl().toString());
+
+        // Advertised URI from listeners with protocol
+        configMap = new HashMap<>(baseWorkerProps());
+        configMap.put(WorkerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443");
+        configMap.put(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG, "https");
+        config = new DistributedConfig(configMap);
+
+        server = new RestServer(config);
+        Assert.assertEquals("https://localhost:8443/", server.advertisedUrl().toString());
+
+        // Advertised URI from listeners with only SSL available
+        configMap = new HashMap<>(baseWorkerProps());
+        configMap.put(WorkerConfig.LISTENERS_CONFIG, "https://localhost:8443");
+        config = new DistributedConfig(configMap);
+
+        server = new RestServer(config);
+        Assert.assertEquals("https://localhost:8443/", server.advertisedUrl().toString());
+
+        // Listener is overriden by advertised values
+        configMap = new HashMap<>(baseWorkerProps());
+        configMap.put(WorkerConfig.LISTENERS_CONFIG, "https://localhost:8443");
+        configMap.put(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG, "http");
+        configMap.put(WorkerConfig.REST_ADVERTISED_HOST_NAME_CONFIG, "somehost");
+        configMap.put(WorkerConfig.REST_ADVERTISED_PORT_CONFIG, "10000");
+        config = new DistributedConfig(configMap);
+
+        server = new RestServer(config);
+        Assert.assertEquals("http://somehost:10000/", server.advertisedUrl().toString());
+
+        // listener from hostname and port
+        configMap = new HashMap<>(baseWorkerProps());
+        configMap.put(WorkerConfig.REST_HOST_NAME_CONFIG, "my-hostname");
+        configMap.put(WorkerConfig.REST_PORT_CONFIG, "8080");
+        config = new DistributedConfig(configMap);
+        server = new RestServer(config);
+        Assert.assertEquals("http://my-hostname:8080/", server.advertisedUrl().toString());
+    }
+
     public void checkCORSRequest(String corsDomain, String origin, String expectedHeader, String method) {
         // To be able to set the Origin, we need to toggle this flag
         System.setProperty("sun.net.http.allowRestrictedHeaders", "true");
@@ -98,7 +168,7 @@ public class RestServerTest {
         Map<String, String> workerProps = baseWorkerProps();
         workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain);
         workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method);
-        WorkerConfig workerConfig = new StandaloneConfig(workerProps);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
         server = new RestServer(workerConfig);
         server.start(herder);
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index ff9043e..ad360b6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -32,10 +32,11 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.TestSinkConnector;
 import org.apache.kafka.connect.runtime.TestSourceConnector;
+import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
-import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
@@ -78,7 +79,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(RestServer.class)
+@PrepareForTest(RestClient.class)
 @PowerMockIgnore("javax.management.*")
 public class ConnectorPluginsResourceTest {
 
@@ -177,8 +178,8 @@ public class ConnectorPluginsResourceTest {
 
     @Before
     public void setUp() throws Exception {
-        PowerMock.mockStatic(RestServer.class,
-                             RestServer.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class));
+        PowerMock.mockStatic(RestClient.class,
+                RestClient.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class, WorkerConfig.class));
 
         plugins = PowerMock.createMock(Plugins.class);
         herder = PowerMock.createMock(AbstractHerder.class);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 89a2218..6e17349 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -23,9 +23,10 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.NotAssignedException;
 import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
-import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
 import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
@@ -59,7 +60,7 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(RestServer.class)
+@PrepareForTest(RestClient.class)
 @PowerMockIgnore("javax.management.*")
 @SuppressWarnings("unchecked")
 public class ConnectorsResourceTest {
@@ -102,9 +103,9 @@ public class ConnectorsResourceTest {
 
     @Before
     public void setUp() throws NoSuchMethodException {
-        PowerMock.mockStatic(RestServer.class,
-                RestServer.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class));
-        connectorsResource = new ConnectorsResource(herder);
+        PowerMock.mockStatic(RestClient.class,
+                RestClient.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class, WorkerConfig.class));
+        connectorsResource = new ConnectorsResource(herder, null);
     }
 
     @Test
@@ -128,9 +129,9 @@ public class ConnectorsResourceTest {
         herder.connectors(EasyMock.capture(cb));
         expectAndCallbackNotLeaderException(cb);
         // Should forward request
-        EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"), EasyMock.eq("GET"),
-                EasyMock.isNull(), EasyMock.anyObject(TypeReference.class)))
-                .andReturn(new RestServer.HttpResponse<>(200, new HashMap<String, List<String>>(), Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)));
+        EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"), EasyMock.eq("GET"),
+                EasyMock.isNull(), EasyMock.anyObject(TypeReference.class), EasyMock.anyObject(WorkerConfig.class)))
+                .andReturn(new RestClient.HttpResponse<>(200, new HashMap<String, String>(), Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)));
 
         PowerMock.replayAll();
 
@@ -177,8 +178,8 @@ public class ConnectorsResourceTest {
         herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
         expectAndCallbackNotLeaderException(cb);
         // Should forward request
-        EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"), EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject()))
-                .andReturn(new RestServer.HttpResponse<>(201, new HashMap<String, List<String>>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES,
+        EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"), EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+                .andReturn(new RestClient.HttpResponse<>(201, new HashMap<String, String>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES,
                     ConnectorType.SOURCE)));
 
         PowerMock.replayAll();
@@ -242,8 +243,8 @@ public class ConnectorsResourceTest {
         herder.deleteConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
         expectAndCallbackNotLeaderException(cb);
         // Should forward request
-        EasyMock.expect(RestServer.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME + "?forward=false", "DELETE", null, null))
-                .andReturn(new RestServer.HttpResponse<>(204, new HashMap<String, List<String>>(), null));
+        EasyMock.expect(RestClient.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME + "?forward=false", "DELETE", null, null, null))
+                .andReturn(new RestClient.HttpResponse<>(204, new HashMap<String, String>(), null));
 
         PowerMock.replayAll();
 
@@ -445,9 +446,9 @@ public class ConnectorsResourceTest {
         herder.restartConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
         expectAndCallbackNotLeaderException(cb);
 
-        EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=true"),
-                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject()))
-                .andReturn(new RestServer.HttpResponse<>(202, new HashMap<String, List<String>>(), null));
+        EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=true"),
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+                .andReturn(new RestClient.HttpResponse<>(202, new HashMap<String, String>(), null));
 
         PowerMock.replayAll();
 
@@ -463,9 +464,9 @@ public class ConnectorsResourceTest {
         String ownerUrl = "http://owner:8083";
         expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl));
 
-        EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=false"),
-                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject()))
-                .andReturn(new RestServer.HttpResponse<>(202, new HashMap<String, List<String>>(), null));
+        EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=false"),
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+                .andReturn(new RestClient.HttpResponse<>(202, new HashMap<String, String>(), null));
 
         PowerMock.replayAll();
 
@@ -496,9 +497,9 @@ public class ConnectorsResourceTest {
         herder.restartTask(EasyMock.eq(taskId), EasyMock.capture(cb));
         expectAndCallbackNotLeaderException(cb);
 
-        EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=true"),
-                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject()))
-                .andReturn(new RestServer.HttpResponse<>(202, new HashMap<String, List<String>>(), null));
+        EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=true"),
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+                .andReturn(new RestClient.HttpResponse<>(202, new HashMap<String, String>(), null));
 
         PowerMock.replayAll();
 
@@ -516,9 +517,9 @@ public class ConnectorsResourceTest {
         String ownerUrl = "http://owner:8083";
         expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl));
 
-        EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=false"),
-                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject()))
-                .andReturn(new RestServer.HttpResponse<>(202, new HashMap<String, List<String>>(), null));
+        EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=false"),
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+                .andReturn(new RestClient.HttpResponse<>(202, new HashMap<String, String>(), null));
 
         PowerMock.replayAll();
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java
new file mode 100644
index 0000000..422b2b0
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.util;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SSLUtilsTest {
+    private static final Map<String, String> DEFAULT_CONFIG = new HashMap<>();
+    static {
+        // The WorkerConfig base class has some required settings without defaults
+        DEFAULT_CONFIG.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
+        DEFAULT_CONFIG.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "config-topic");
+        DEFAULT_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        DEFAULT_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, "connect-test-group");
+        DEFAULT_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_CONFIG.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
+    }
+
+    @Test
+    public void testGetOrDefault() {
+        String existingKey = "exists";
+        String missingKey = "missing";
+        String value = "value";
+        String defaultValue = "default";
+        Map<String, Object> map = new HashMap<>();
+        map.put("exists", "value");
+
+        Assert.assertEquals(SSLUtils.getOrDefault(map, existingKey, defaultValue), value);
+        Assert.assertEquals(SSLUtils.getOrDefault(map, missingKey, defaultValue), defaultValue);
+    }
+
+    @Test
+    public void testCreateSslContextFactory() {
+        Map<String, String> configMap = new HashMap<>(DEFAULT_CONFIG);
+        configMap.put("ssl.keystore.location", "/path/to/keystore");
+        configMap.put("ssl.keystore.password", "123456");
+        configMap.put("ssl.key.password", "123456");
+        configMap.put("ssl.truststore.location", "/path/to/truststore");
+        configMap.put("ssl.truststore.password", "123456");
+        configMap.put("ssl.provider", "SunJSSE");
+        configMap.put("ssl.cipher.suites", "SSL_RSA_WITH_RC4_128_SHA,SSL_RSA_WITH_RC4_128_MD5");
+        configMap.put("ssl.secure.random.implementation", "SHA1PRNG");
+        configMap.put("ssl.client.auth", "required");
+        configMap.put("ssl.endpoint.identification.algorithm", "HTTPS");
+        configMap.put("ssl.keystore.type", "JKS");
+        configMap.put("ssl.protocol", "TLS");
+        configMap.put("ssl.truststore.type", "JKS");
+        configMap.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1,TLSv1");
+        configMap.put("ssl.keymanager.algorithm", "SunX509");
+        configMap.put("ssl.trustmanager.algorithm", "PKIX");
+
+        DistributedConfig config = new DistributedConfig(configMap);
+        SslContextFactory ssl = SSLUtils.createSslContextFactory(config);
+
+        Assert.assertEquals("/path/to/keystore", ssl.getKeyStorePath());
+        Assert.assertEquals("/path/to/truststore", ssl.getTrustStore());
+        Assert.assertEquals("SunJSSE", ssl.getProvider());
+        Assert.assertArrayEquals(new String[] {"SSL_RSA_WITH_RC4_128_SHA", "SSL_RSA_WITH_RC4_128_MD5"}, ssl.getIncludeCipherSuites());
+        Assert.assertEquals("SHA1PRNG", ssl.getSecureRandomAlgorithm());
+        Assert.assertTrue(ssl.getNeedClientAuth());
+        Assert.assertEquals("JKS", ssl.getKeyStoreType());
+        Assert.assertEquals("JKS", ssl.getTrustStoreType());
+        Assert.assertEquals("TLS", ssl.getProtocol());
+        Assert.assertArrayEquals(new String[] {"TLSv1.2", "TLSv1.1", "TLSv1"}, ssl.getIncludeProtocols());
+        Assert.assertEquals("SunX509", ssl.getSslKeyManagerFactoryAlgorithm());
+        Assert.assertEquals("PKIX", ssl.getTrustManagerFactoryAlgorithm());
+    }
+
+    @Test
+    public void testCreateSslContextFactoryDefaultValues() {
+        Map<String, String> configMap = new HashMap<>(DEFAULT_CONFIG);
+        configMap.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "/tmp/offset/file");
+        configMap.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        configMap.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        configMap.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        configMap.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        configMap.put("ssl.keystore.location", "/path/to/keystore");
+        configMap.put("ssl.keystore.password", "123456");
+        configMap.put("ssl.key.password", "123456");
+        configMap.put("ssl.truststore.location", "/path/to/truststore");
+        configMap.put("ssl.truststore.password", "123456");
+        configMap.put("ssl.provider", "SunJSSE");
+        configMap.put("ssl.cipher.suites", "SSL_RSA_WITH_RC4_128_SHA,SSL_RSA_WITH_RC4_128_MD5");
+        configMap.put("ssl.secure.random.implementation", "SHA1PRNG");
+
+        DistributedConfig config = new DistributedConfig(configMap);
+        SslContextFactory ssl = SSLUtils.createSslContextFactory(config);
+
+        Assert.assertEquals(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ssl.getKeyStoreType());
+        Assert.assertEquals(SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ssl.getTrustStoreType());
+        Assert.assertEquals(SslConfigs.DEFAULT_SSL_PROTOCOL, ssl.getProtocol());
+        Assert.assertArrayEquals(Arrays.asList(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS.split("\\s*,\\s*")).toArray(), ssl.getIncludeProtocols());
+        Assert.assertEquals(SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ssl.getSslKeyManagerFactoryAlgorithm());
+        Assert.assertEquals(SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ssl.getTrustManagerFactoryAlgorithm());
+        Assert.assertFalse(ssl.getNeedClientAuth());
+        Assert.assertFalse(ssl.getWantClientAuth());
+    }
+}
diff --git a/docs/connect.html b/docs/connect.html
index b910cf5..1230a7a 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -187,7 +187,43 @@
 
     <h4><a id="connect_rest" href="#connect_rest">REST API</a></h4>
 
-    <p>Since Kafka Connect is intended to be run as a service, it also provides a REST API for managing connectors. By default, this service runs on port 8083. The following are the currently supported endpoints:</p>
+    <p>Since Kafka Connect is intended to be run as a service, it also provides a REST API for managing connectors. The REST API server can be configured using the <code>listeners</code> configuration option.
+        This field should contain a list of listeners in the following format: <code>protocol://host:port,protocol2://host2:port2</code>. Currently supported protocols are <code>http</code> and <code>https</code>.
+        For example:</p>
+
+    <pre class="brush: text;">
+        listeners=http://localhost:8080,https://localhost:8443
+    </pre>
+
+    <p>By default, if no <code>listeners</code> are specified, the REST server runs on port 8083 using the HTTP protocol. When using HTTPS, the configuration has to include the SSL configuration.
+    By default, it will use the <code>ssl.*</code> settings. In case it is needed to use different configuration for the REST API than for connecting to Kafka brokers, the fields can be prefixed with <code>listeners.https</code>.
+        When using the prefix, only the prefixed options will be used and the <code>ssl.*</code> options without the prefix will be ignored. Following fields can be used to configure HTTPS for the REST API:</p>
+
+    <ul>
+        <li><code>ssl.keystore.location</code></li>
+        <li><code>ssl.keystore.password</code></li>
+        <li><code>ssl.keystore.type</code></li>
+        <li><code>ssl.key.password</code></li>
+        <li><code>ssl.truststore.location</code></li>
+        <li><code>ssl.truststore.password</code></li>
+        <li><code>ssl.truststore.type</code></li>
+        <li><code>ssl.enabled.protocols</code></li>
+        <li><code>ssl.provider</code></li>
+        <li><code>ssl.protocol</code></li>
+        <li><code>ssl.cipher.suites</code></li>
+        <li><code>ssl.keymanager.algorithm</code></li>
+        <li><code>ssl.secure.random.implementation</code></li>
+        <li><code>ssl.trustmanager.algorithm</code></li>
+        <li><code>ssl.endpoint.identification.algorithm</code></li>
+        <li><code>ssl.client.auth</code></li>
+    </ul>
+
+    <p>The REST API is used not only by users to monitor / manage Kafka Connect. It is also used for the Kafka Connect cross-cluster communication. Requests received on the follower nodes REST API will be forwarded to the leader node REST API.
+    In case the URI under which is given host reachable is different from the URI which it listens on, the configuration options <code>rest.advertised.host.name</code>, <code>rest.advertised.port</code> and <code>rest.advertised.listener</code>
+    can be used to change the URI which will be used by the follower nodes to connect with the leader. When using both HTTP and HTTPS listeners, the <code>rest.advertised.listener</code> option can be also used to define which listener
+        will be used for the cross-cluster communication. When using HTTPS for communication between nodes, the same <code>ssl.*</code> or <code>listeners.https</code> options will be used to configure the HTTPS client.</p>
+
+    <p>The following are the currently supported REST API endpoints:</p>
 
     <ul>
         <li><code>GET /connectors</code> - return a list of active connectors</li>
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 0f395b8..963fd03 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -92,6 +92,7 @@ libs += [
   jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
   jacksonJaxrsJsonProvider: "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson",
   jettyServer: "org.eclipse.jetty:jetty-server:$versions.jetty",
+  jettyClient: "org.eclipse.jetty:jetty-client:$versions.jetty",
   jettyServlet: "org.eclipse.jetty:jetty-servlet:$versions.jetty",
   jettyServlets: "org.eclipse.jetty:jetty-servlets:$versions.jetty",
   jerseyContainerServlet: "org.glassfish.jersey.containers:jersey-container-servlet:$versions.jersey",

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.

Mime
View raw message