From commits-return-9584-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Wed May 30 06:36:15 2018
Return-Path:
X-Original-To: archive-asf-public@cust-asf.ponee.io
Delivered-To: archive-asf-public@cust-asf.ponee.io
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by mx-eu-01.ponee.io (Postfix) with SMTP id E5D4618063B
for ; Wed, 30 May 2018 06:36:12 +0200 (CEST)
Received: (qmail 32108 invoked by uid 500); 30 May 2018 04:36:11 -0000
Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: dev@kafka.apache.org
Delivered-To: mailing list commits@kafka.apache.org
Received: (qmail 32099 invoked by uid 99); 30 May 2018 04:36:11 -0000
Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70)
by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 May 2018 04:36:11 +0000
Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33)
id BF1618081F; Wed, 30 May 2018 04:36:10 +0000 (UTC)
Date: Wed, 30 May 2018 04:36:10 +0000
To: "commits@kafka.apache.org"
Subject: [kafka] branch trunk updated: KAFKA-6776: ConnectRestExtension
Interfaces & Rest integration (KIP-285)
MIME-Version: 1.0
Content-Type: text/plain; charset=utf-8
Content-Transfer-Encoding: 8bit
Message-ID: <152765497000.11794.14412370610330420440@gitbox.apache.org>
From: ewencp@apache.org
X-Git-Host: gitbox.apache.org
X-Git-Repo: kafka
X-Git-Refname: refs/heads/trunk
X-Git-Reftype: branch
X-Git-Oldrev: fffb9c5b5cac1a669f22dd99860774d6c0fdb94b
X-Git-Newrev: 98094954a27849b027831a7401e965c6a949790c
X-Git-Rev: 98094954a27849b027831a7401e965c6a949790c
X-Git-NotificationType: ref_changed_plus_diff
X-Git-Multimail-Version: 1.5.dev
Auto-Submitted: auto-generated
This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9809495 KAFKA-6776: ConnectRestExtension Interfaces & Rest integration (KIP-285)
9809495 is described below
commit 98094954a27849b027831a7401e965c6a949790c
Author: Magesh Nandakumar
AuthorDate: Tue May 29 21:35:22 2018 -0700
KAFKA-6776: ConnectRestExtension Interfaces & Rest integration (KIP-285)
This PR provides the implementation for KIP-285 and also a reference implementation for authenticating BasicAuth credentials using JAAS LoginModule
Author: Magesh Nandakumar
Reviewers: Randall Hauch , Arjun Satish , Konstantine Karantasis , Ewen Cheslack-Postava
Closes #4931 from mageshn/KIP-285
---
build.gradle | 40 +++++
checkstyle/import-control.xml | 14 +-
.../apache/kafka/connect/components/Versioned.java | 30 ++++
.../apache/kafka/connect/connector/Connector.java | 9 +-
.../apache/kafka/connect/health/AbstractState.java | 74 +++++++++
.../kafka/connect/health/ConnectClusterState.java | 46 ++++++
.../kafka/connect/health/ConnectorHealth.java | 86 +++++++++++
.../kafka/connect/health/ConnectorState.java | 35 +++++
.../apache/kafka/connect/health/ConnectorType.java | 43 ++++++
.../org/apache/kafka/connect/health/TaskState.java | 69 +++++++++
.../kafka/connect/rest/ConnectRestExtension.java | 58 +++++++
.../connect/rest/ConnectRestExtensionContext.java | 44 ++++++
.../extenstion/BasicAuthSecurityRestExtension.java | 79 ++++++++++
.../basic/auth/extenstion/JaasBasicAuthFilter.java | 100 ++++++++++++
.../auth/extenstion/PropertyFileLoginModule.java | 116 ++++++++++++++
....apache.kafka.connect.rest.ConnectRestExtension | 16 ++
.../auth/extenstion/JaasBasicAuthFilterTest.java | 168 +++++++++++++++++++++
.../apache/kafka/connect/runtime/WorkerConfig.java | 11 +-
.../runtime/health/ConnectClusterStateImpl.java | 86 +++++++++++
.../runtime/isolation/DelegatingClassLoader.java | 44 ++++--
.../runtime/isolation/PluginScanResult.java | 13 +-
.../connect/runtime/isolation/PluginType.java | 2 +
.../kafka/connect/runtime/isolation/Plugins.java | 49 ++++++
.../runtime/rest/ConnectRestConfigurable.java | 138 +++++++++++++++++
.../rest/ConnectRestExtensionContextImpl.java | 47 ++++++
.../kafka/connect/runtime/rest/RestServer.java | 39 ++++-
.../connect/runtime/WorkerSourceTaskTest.java | 3 +-
.../connect/runtime/isolation/PluginsTest.java | 47 ++++++
.../kafka/connect/runtime/rest/RestServerTest.java | 32 ++--
....apache.kafka.connect.rest.ConnectRestExtension | 16 ++
settings.gradle | 2 +-
31 files changed, 1519 insertions(+), 37 deletions(-)
diff --git a/build.gradle b/build.gradle
index 474329a..4f3fd77 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1207,6 +1207,7 @@ project(':connect:api') {
dependencies {
compile project(':clients')
compile libs.slf4jApi
+ compile libs.jerseyContainerServlet
testCompile libs.junit
@@ -1431,6 +1432,45 @@ project(':connect:file') {
}
}
+project(':connect:basic-auth-extension') {
+ archivesBaseName = "connect-basic-auth-extension"
+
+ dependencies {
+ compile project(':connect:api')
+ compile libs.slf4jApi
+
+ testCompile libs.bcpkix
+ testCompile libs.easymock
+ testCompile libs.junit
+ testCompile libs.powermockJunit4
+ testCompile libs.powermockEasymock
+ testCompile project(':clients').sourceSets.test.output
+
+ testRuntime libs.slf4jlog4j
+ }
+
+ javadoc {
+ enabled = false
+ }
+
+ tasks.create(name: "copyDependantLibs", type: Copy) {
+ from (configurations.testRuntime) {
+ include('slf4j-log4j12*')
+ include('log4j*jar')
+ }
+ from (configurations.runtime) {
+ exclude('kafka-clients*')
+ exclude('connect-*')
+ }
+ into "$buildDir/dependant-libs"
+ duplicatesStrategy 'exclude'
+ }
+
+ jar {
+ dependsOn copyDependantLibs
+ }
+}
+
task aggregatedJavadoc(type: Javadoc) {
def projectsWithJavadoc = subprojects.findAll { it.javadoc.enabled }
source = projectsWithJavadoc.collect { it.sourceSets.main.allJava }
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index e4f9a4e..5549205 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -290,6 +290,7 @@
+
@@ -307,7 +308,16 @@
-
+
+
+
+
+
+
+
+
+
+
@@ -327,6 +337,8 @@
+
+
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/components/Versioned.java b/connect/api/src/main/java/org/apache/kafka/connect/components/Versioned.java
new file mode 100644
index 0000000..adabe8f
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/components/Versioned.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.components;
+
+/**
+ * Connect requires some components implement this interface to define a version string.
+ */
+public interface Versioned {
+ /**
+ * Get the version of this component.
+ *
+ * @return the version, formatted as a String. The version may not be (@code null} or empty.
+ */
+ String version();
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
index 30dfd3c..329429b 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.components.Versioned;
import java.util.List;
import java.util.Map;
@@ -41,16 +42,10 @@ import java.util.Map;
* Tasks.
*
*/
-public abstract class Connector {
+public abstract class Connector implements Versioned {
protected ConnectorContext context;
- /**
- * Get the version of this connector.
- *
- * @return the version, formatted as a String
- */
- public abstract String version();
/**
* Initialize this connector, using the provided ConnectorContext to notify the runtime of
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java
new file mode 100644
index 0000000..a18c463
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.health;
+
+/**
+ * Provides the current status along with identifier for Connect worker and tasks.
+ */
+public abstract class AbstractState {
+
+ private final String state;
+ private final String traceMessage;
+ private final String workerId;
+
+ /**
+ * Construct a state for connector or task.
+ *
+ * @param state the status of connector or task; may not be null or empty
+ * @param workerId the workerId associated with the connector or the task; may not be null or empty
+ * @param traceMessage any error trace message associated with the connector or the task; may be null or empty
+ */
+ public AbstractState(String state, String workerId, String traceMessage) {
+ if (state != null && !state.trim().isEmpty()) {
+ throw new IllegalArgumentException("State must not be null or empty");
+ }
+ if (workerId != null && !workerId.trim().isEmpty()) {
+ throw new IllegalArgumentException("Worker ID must not be null or empty");
+ }
+ this.state = state;
+ this.workerId = workerId;
+ this.traceMessage = traceMessage;
+ }
+
+ /**
+ * Provides the current state of the connector or task.
+ *
+ * @return state, never {@code null} or empty
+ */
+ public String state() {
+ return state;
+ }
+
+ /**
+ * The identifier of the worker associated with the connector or the task.
+ *
+ * @return workerId, never {@code null} or empty.
+ */
+ public String workerId() {
+ return workerId;
+ }
+
+ /**
+ * The error message associated with the connector or task.
+ *
+ * @return traceMessage, can be {@code null} or empty.
+ */
+ public String traceMessage() {
+ return traceMessage;
+ }
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java
new file mode 100644
index 0000000..d4292ef
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.health;
+
+import java.util.Collection;
+
+/**
+ * Provides the ability to lookup connector metadata and its health. This is made available to the {@link org.apache.kafka.connect.rest.ConnectRestExtension}
+ * implementations. The Connect framework provides the implementation for this interface.
+ */
+public interface ConnectClusterState {
+
+ /**
+ * Get the names of the connectors currently deployed in this cluster. This is a full list of connectors in the cluster gathered from
+ * the current configuration, which may change over time.
+ *
+ * @return collection of connector names, never {@code null}
+ */
+ Collection connectors();
+
+ /**
+ * Lookup the current health of a connector and its tasks. This provides the current snapshot of health by querying the underlying
+ * herder. A connector returned by previous invocation of {@link #connectors()} may no longer be available and could result in {@link
+ * org.apache.kafka.connect.errors.NotFoundException}.
+ *
+ * @param connName name of the connector
+ * @return the health of the connector for the connector name
+ * @throws org.apache.kafka.connect.errors.NotFoundException if the requested connector can't be found
+ */
+ ConnectorHealth connectorHealth(String connName);
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java
new file mode 100644
index 0000000..3a9efd1
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.health;
+
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Provides basic health information about the connector and its tasks.
+ */
+public class ConnectorHealth {
+
+ private final String name;
+ private final ConnectorState connectorState;
+ private final Map tasks;
+ private final ConnectorType type;
+
+
+ public ConnectorHealth(String name,
+ ConnectorState connectorState,
+ Map tasks,
+ ConnectorType type) {
+ if (name != null && !name.trim().isEmpty()) {
+ throw new IllegalArgumentException("Connector name is required");
+ }
+ Objects.requireNonNull(connectorState, "connectorState can't be null");
+ Objects.requireNonNull(tasks, "tasks can't be null");
+ Objects.requireNonNull(type, "type can't be null");
+ this.name = name;
+ this.connectorState = connectorState;
+ this.tasks = tasks;
+ this.type = type;
+ }
+
+ /**
+ * Provides the name of the connector.
+ *
+ * @return name, never {@code null} or empty
+ */
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Provides the current state of the connector.
+ *
+ * @return the connector state, never {@code null}
+ */
+ public ConnectorState connectorState() {
+ return connectorState;
+ }
+
+ /**
+ * Provides the current state of the connector tasks.
+ *
+ * @return the state for each task ID; never {@code null}
+ */
+ public Map tasksState() {
+ return tasks;
+ }
+
+ /**
+ * Provides the type of the connector.
+ *
+ * @return type, never {@code null}
+ */
+ public ConnectorType type() {
+ return type;
+ }
+
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java
new file mode 100644
index 0000000..d5571bc
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.health;
+
+/**
+ * Describes the status, worker ID, and any errors associated with a connector.
+ */
+public class ConnectorState extends AbstractState {
+
+ /**
+ * Provides an instance of the ConnectorState.
+ *
+ * @param state - the status of connector, may not be {@code null} or empty
+ * @param workerId - the workerId associated with the connector, may not be {@code null} or empty
+ * @param traceMessage - any error message associated with the connector, may be {@code null} or empty
+ */
+ public ConnectorState(String state, String workerId, String traceMessage) {
+ super(state, workerId, traceMessage);
+ }
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorType.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorType.java
new file mode 100644
index 0000000..fa9db6f
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.health;
+
+import java.util.Locale;
+
+/**
+ * Enum definition that identifies the type of the connector.
+ */
+public enum ConnectorType {
+ /**
+ * Identifies a source connector
+ */
+ SOURCE,
+ /**
+ * Identifies a sink connector
+ */
+ SINK,
+ /**
+ * Identifies a connector whose type could not be inferred
+ */
+ UNKNOWN;
+
+ @Override
+ public String toString() {
+ return super.toString().toLowerCase(Locale.ROOT);
+ }
+
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java
new file mode 100644
index 0000000..1c1be15
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.health;
+
+import java.util.Objects;
+
+/**
+ * Describes the state, IDs, and any errors of a connector task.
+ */
+public class TaskState extends AbstractState {
+
+ private final int taskId;
+
+ /**
+ * Provides an instance of {@link TaskState}.
+ *
+ * @param taskId the id associated with the connector task
+ * @param state the status of the task, may not be {@code null} or empty
+ * @param workerId id of the worker the task is associated with, may not be {@code null} or empty
+ * @param trace error message if that task had failed or errored out, may be {@code null} or empty
+ */
+ public TaskState(int taskId, String state, String workerId, String trace) {
+ super(state, workerId, trace);
+ this.taskId = taskId;
+ }
+
+ /**
+ * Provides the ID of the task.
+ *
+ * @return the task ID
+ */
+ public int taskId() {
+ return taskId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TaskState taskState = (TaskState) o;
+
+ return taskId == taskState.taskId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(taskId);
+ }
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java
new file mode 100644
index 0000000..aa479a3
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.rest;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.health.ConnectClusterState;
+
+import java.io.Closeable;
+import java.util.Map;
+
+/**
+ * A plugin interface to allow registration of new JAX-RS resources like Filters, REST endpoints, providers, etc. The implementations will
+ * be discovered using the standard Java {@link java.util.ServiceLoader} mechanism by Connect's plugin class loading mechanism.
+ *
+ *
The extension class(es) must be packaged as a plugin, with one JAR containing the implementation classes and a {@code
+ * META-INF/services/org.apache.kafka.connect.rest.extension.ConnectRestExtension} file that contains the fully qualified name of the
+ * class(es) that implement the ConnectRestExtension interface. The plugin should also include the JARs of all dependencies except those
+ * already provided by the Connect framework.
+ *
+ *
To install into a Connect installation, add a directory named for the plugin and containing the plugin's JARs into a directory that is
+ * on Connect's {@code plugin.path}, and (re)start the Connect worker.
+ *
+ *
When the Connect worker process starts up, it will read its configuration and instantiate all of the REST extension implementation
+ * classes that are specified in the `rest.extension.classes` configuration property. Connect will then pass its configuration to each
+ * extension via the {@link Configurable#configure(Map)} method, and will then call {@link #register} with a provided context.
+ *
+ *
When the Connect worker shuts down, it will call the extension's {@link #close} method to allow the implementation to release all of
+ * its resources.
+ */
+public interface ConnectRestExtension extends Configurable, Versioned, Closeable {
+
+ /**
+ * ConnectRestExtension implementations can register custom JAX-RS resources via the {@link #register(ConnectRestExtensionContext)}
+ * method. The Connect framework will invoke this method after registering the default Connect resources. If the implementations attempt
+ * to re-register any of the Connect resources, it will be be ignored and will be logged.
+ *
+ * @param restPluginContext The context provides access to JAX-RS {@link javax.ws.rs.core.Configurable} and {@link
+ * ConnectClusterState}.The custom JAX-RS resources can be registered via the {@link
+ * ConnectRestExtensionContext#configurable()}
+ */
+ void register(ConnectRestExtensionContext restPluginContext);
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java
new file mode 100644
index 0000000..7608597
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.rest;
+
+import org.apache.kafka.connect.health.ConnectClusterState;
+
+import javax.ws.rs.core.Configurable;
+
+/**
+ * The interface provides the ability for {@link ConnectRestExtension} implementations to access the JAX-RS
+ * {@link javax.ws.rs.core.Configurable} and cluster state {@link ConnectClusterState}. The implementation for the interface is provided
+ * by the Connect framework.
+ */
+public interface ConnectRestExtensionContext {
+
+ /**
+ * Provides an implementation of {@link javax.ws.rs.core.Configurable} that be used to register JAX-RS resources.
+ *
+ * @return @return the JAX-RS {@link javax.ws.rs.core.Configurable}; never {@code null}
+ */
+ Configurable extends Configurable> configurable();
+
+ /**
+ * Provides the cluster state and health information about the connectors and tasks.
+ *
+ * @return the cluster state information; never {@code null}
+ */
+ ConnectClusterState clusterState();
+}
diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java
new file mode 100644
index 0000000..91d5d9c
--- /dev/null
+++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.rest.basic.auth.extenstion;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
+import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Provides the ability to authenticate incoming BasicAuth credentials using the configured JAAS {@link
+ * javax.security.auth.spi.LoginModule}. An entry with the name {@code KafkaConnect} is expected in the JAAS config file configured in the
+ * JVM. An implementation of {@link javax.security.auth.spi.LoginModule} needs to be provided in the JAAS config file. The {@code
+ * LoginModule} implementation should configure the {@link javax.security.auth.callback.CallbackHandler} with only {@link
+ * javax.security.auth.callback.NameCallback} and {@link javax.security.auth.callback.PasswordCallback}.
+ *
+ *
To use this extension, one needs to add the following config in the {@code worker.properties}
+ *
This is a reference implementation of the {@link ConnectRestExtension} interface. It registers an implementation of {@link
+ * javax.ws.rs.container.ContainerRequestFilter} that does JAAS based authentication of incoming Basic Auth credentials. {@link
+ * ConnectRestExtension} implementations are loaded via the plugin class loader using {@link java.util.ServiceLoader} mechanism and hence
+ * the packaged jar includes {@code META-INF/services/org.apache.kafka.connect.rest.extension.ConnectRestExtension} with the entry
+ * {@code org.apache.kafka.connect.extension.auth.jaas.BasicAuthSecurityRestExtension}
+ *
+ *
NOTE: The implementation ships with a default {@link PropertyFileLoginModule} that helps authenticate the request against a
+ * property file. {@link PropertyFileLoginModule} is NOT intended to be used in production since the credentials are stored in PLAINTEXT. One can use
+ * this extension in production by using their own implementation of {@link javax.security.auth.spi.LoginModule} that authenticates against
+ * stores like LDAP, DB, etc.
+ */
+public class BasicAuthSecurityRestExtension implements ConnectRestExtension {
+
+ @Override
+ public void register(ConnectRestExtensionContext restPluginContext) {
+ restPluginContext.configurable().register(JaasBasicAuthFilter.class);
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void configure(Map configs) {
+
+ }
+
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+}
diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java
new file mode 100644
index 0000000..7231700
--- /dev/null
+++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.rest.basic.auth.extenstion;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import javax.ws.rs.container.ContainerRequestContext;
+import javax.ws.rs.container.ContainerRequestFilter;
+import javax.ws.rs.core.Response;
+
+public class JaasBasicAuthFilter implements ContainerRequestFilter {
+
+ private static final String CONNECT_LOGIN_MODULE = "KafkaConnect";
+ static final String AUTHORIZATION = "Authorization";
+
+ @Override
+ public void filter(ContainerRequestContext requestContext) throws IOException {
+
+ try {
+ LoginContext loginContext =
+ new LoginContext(CONNECT_LOGIN_MODULE, new BasicAuthCallBackHandler(
+ requestContext.getHeaderString(AUTHORIZATION)));
+ loginContext.login();
+ } catch (LoginException | ConfigException e) {
+ requestContext.abortWith(
+ Response.status(Response.Status.UNAUTHORIZED)
+ .entity("User cannot access the resource.")
+ .build());
+ }
+ }
+
+
+ public static class BasicAuthCallBackHandler implements CallbackHandler {
+
+ private static final String BASIC = "basic";
+ private static final char COLON = ':';
+ private static final char SPACE = ' ';
+ private String username;
+ private String password;
+
+ public BasicAuthCallBackHandler(String credentials) {
+ if (credentials != null) {
+ int space = credentials.indexOf(SPACE);
+ if (space > 0) {
+ String method = credentials.substring(0, space);
+ if (BASIC.equalsIgnoreCase(method)) {
+ credentials = credentials.substring(space + 1);
+ credentials = new String(Base64.getDecoder().decode(credentials),
+ StandardCharsets.UTF_8);
+ int i = credentials.indexOf(COLON);
+ if (i > 0) {
+ username = credentials.substring(0, i);
+ password = credentials.substring(i + 1);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+ for (Callback callback : callbacks) {
+ if (callback instanceof NameCallback) {
+ ((NameCallback) callback).setName(username);
+ } else if (callback instanceof PasswordCallback) {
+ ((PasswordCallback) callback).setPassword(password.toCharArray());
+ } else {
+ throw new UnsupportedCallbackException(callback, "Supports only NameCallback "
+ + "and PasswordCallback");
+ }
+ }
+ }
+ }
+}
diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java
new file mode 100644
index 0000000..7af7863
--- /dev/null
+++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.rest.basic.auth.extenstion;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.login.LoginException;
+import javax.security.auth.spi.LoginModule;
+
+/**
+ * {@link PropertyFileLoginModule} authenticates against a properties file.
+ * The credentials should be stored in the format {username}={password} in the properties file.
+ * The absolute path of the file needs to specified using the option file
+ *
+ *
NOTE: This implementation is NOT intended to be used in production since the credentials are stored in PLAINTEXT in the
+ * properties file.
+ */
+public class PropertyFileLoginModule implements LoginModule {
+ private static final Logger log = LoggerFactory.getLogger(PropertyFileLoginModule.class);
+
+ private CallbackHandler callbackHandler;
+ private static final String FILE_OPTIONS = "file";
+ private String fileName;
+ private boolean authenticated;
+
+ private static Map credentialPropertiesMap = new ConcurrentHashMap<>();
+
+ @Override
+ public void initialize(Subject subject, CallbackHandler callbackHandler, Map sharedState, Map options) {
+ this.callbackHandler = callbackHandler;
+ fileName = (String) options.get(FILE_OPTIONS);
+ if (fileName == null || fileName.trim().isEmpty()) {
+ throw new ConfigException("Property Credentials file must be specified");
+ }
+ if (!credentialPropertiesMap.containsKey(fileName)) {
+ Properties credentialProperties = new Properties();
+ try {
+ credentialProperties.load(Files.newInputStream(Paths.get(fileName)));
+ credentialPropertiesMap.putIfAbsent(fileName, credentialProperties);
+ } catch (IOException e) {
+ log.error("Error loading credentials file ", e);
+ throw new ConfigException("Error loading Property Credentials file");
+ }
+ }
+ }
+
+ @Override
+ public boolean login() throws LoginException {
+ Callback[] callbacks = configureCallbacks();
+ try {
+ callbackHandler.handle(callbacks);
+ } catch (Exception e) {
+ throw new LoginException(e.getMessage());
+ }
+
+ String username = ((NameCallback) callbacks[0]).getName();
+ char[] passwordChars = ((PasswordCallback) callbacks[1]).getPassword();
+ String password = passwordChars != null ? new String(passwordChars) : null;
+ Properties credentialProperties = credentialPropertiesMap.get(fileName);
+ authenticated = credentialProperties.isEmpty() ||
+ (password != null && password.equals(credentialProperties.get(username)));
+ return authenticated;
+ }
+
+ @Override
+ public boolean commit() throws LoginException {
+ return authenticated;
+ }
+
+ @Override
+ public boolean abort() throws LoginException {
+ return true;
+ }
+
+ @Override
+ public boolean logout() throws LoginException {
+ return true;
+ }
+
+ private Callback[] configureCallbacks() {
+
+ Callback[] callbacks = new Callback[2];
+ callbacks[0] = new NameCallback("Enter user name");
+ callbacks[1] = new PasswordCallback("Enter password", false);
+ return callbacks;
+ }
+}
diff --git a/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension b/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
new file mode 100644
index 0000000..098c947
--- /dev/null
+++ b/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.rest.basic.auth.extenstion.BasicAuthSecurityRestExtension
\ No newline at end of file
diff --git a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java
new file mode 100644
index 0000000..80299f8
--- /dev/null
+++ b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.rest.basic.auth.extenstion;
+
+import org.apache.kafka.common.security.JaasUtils;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.annotation.MockStrict;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import javax.security.auth.login.Configuration;
+import javax.ws.rs.container.ContainerRequestContext;
+import javax.ws.rs.core.Response;
+
+import static org.powermock.api.easymock.PowerMock.replayAll;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.*")
+public class JaasBasicAuthFilterTest {
+
+ @MockStrict
+ private ContainerRequestContext requestContext;
+
+ private JaasBasicAuthFilter jaasBasicAuthFilter = new JaasBasicAuthFilter();
+ private String previousJaasConfig;
+ private Configuration previousConfiguration;
+
+ @Before
+ public void setup() throws IOException {
+ EasyMock.reset(requestContext);
+ }
+
+ @After
+ public void tearDown() {
+ if (previousJaasConfig != null) {
+ System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, previousJaasConfig);
+ }
+ Configuration.setConfiguration(previousConfiguration);
+ }
+
+ @Test
+ public void testSuccess() throws IOException {
+ File credentialFile = File.createTempFile("credential", ".properties");
+ credentialFile.deleteOnExit();
+ List lines = new ArrayList<>();
+ lines.add("user=password");
+ lines.add("user1=password1");
+ Files.write(credentialFile.toPath(), lines, StandardCharsets.UTF_8);
+
+ setupJaasConfig("KafkaConnect", credentialFile.getPath(), true);
+ setMock("Basic", "user", "password", false);
+
+ jaasBasicAuthFilter.filter(requestContext);
+ }
+
+
+ @Test
+ public void testBadCredential() throws IOException {
+ setMock("Basic", "user1", "password", true);
+ jaasBasicAuthFilter.filter(requestContext);
+ }
+
+ @Test
+ public void testBadPassword() throws IOException {
+ setMock("Basic", "user", "password1", true);
+ jaasBasicAuthFilter.filter(requestContext);
+ }
+
+ @Test
+ public void testUnknownBearer() throws IOException {
+ setMock("Unknown", "user", "password", true);
+ jaasBasicAuthFilter.filter(requestContext);
+ }
+
+ @Test
+ public void testUnknownLoginModule() throws IOException {
+ setupJaasConfig("KafkaConnect1", "/tmp/testcrednetial", true);
+ Configuration.setConfiguration(null);
+ setMock("Basic", "user", "password", true);
+ jaasBasicAuthFilter.filter(requestContext);
+ }
+
+ @Test
+ public void testUnknownCredentialsFile() throws IOException {
+ setupJaasConfig("KafkaConnect", "/tmp/testcrednetial", true);
+ Configuration.setConfiguration(null);
+ setMock("Basic", "user", "password", true);
+ jaasBasicAuthFilter.filter(requestContext);
+ }
+
+ @Test
+ public void testEmptyCredentialsFile() throws IOException {
+ File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
+ jaasConfigFile.deleteOnExit();
+ System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
+ setupJaasConfig("KafkaConnect", "", true);
+ Configuration.setConfiguration(null);
+ setMock("Basic", "user", "password", true);
+ jaasBasicAuthFilter.filter(requestContext);
+ }
+
+ @Test
+ public void testNoFileOption() throws IOException {
+ File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
+ jaasConfigFile.deleteOnExit();
+ System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
+ setupJaasConfig("KafkaConnect", "", false);
+ Configuration.setConfiguration(null);
+ setMock("Basic", "user", "password", true);
+ jaasBasicAuthFilter.filter(requestContext);
+ }
+
+ private void setMock(String authorization, String username, String password, boolean exceptionCase) {
+ String authHeader = authorization + " " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes());
+ EasyMock.expect(requestContext.getHeaderString(JaasBasicAuthFilter.AUTHORIZATION))
+ .andReturn(authHeader);
+ if (exceptionCase) {
+ requestContext.abortWith(EasyMock.anyObject(Response.class));
+ EasyMock.expectLastCall();
+ }
+ replayAll();
+ }
+
+ private void setupJaasConfig(String loginModule, String credentialFilePath, boolean includeFileOptions) throws IOException {
+ File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
+ jaasConfigFile.deleteOnExit();
+ previousJaasConfig = System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
+
+ List lines;
+ lines = new ArrayList<>();
+ lines.add(loginModule + " { org.apache.kafka.connect.rest.basic.auth.extenstion.PropertyFileLoginModule required ");
+ if (includeFileOptions) {
+ lines.add("file=\"" + credentialFilePath + "\"");
+ }
+ lines.add(";};");
+ Files.write(jaasConfigFile.toPath(), lines, StandardCharsets.UTF_8);
+ previousConfiguration = Configuration.getConfiguration();
+ Configuration.setConfiguration(null);
+ }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index c9c32e7..3c76d0f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -190,6 +190,13 @@ public class WorkerConfig extends AbstractConfig {
+ "Examples: plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,"
+ "/opt/connectors";
+ public static final String REST_EXTENSION_CLASSES_CONFIG = "rest.extension.classes";
+ protected static final String REST_EXTENSION_CLASSES_DOC =
+ "Comma-separated names of ConnectRestExtension classes, loaded and called "
+ + "in the order specified. Implementing the interface "
+ + "ConnectRestExtension allows you to inject into Connect's REST API user defined resources like filters. "
+ + "Typically used to add custom capability like logging, security, etc.";
+
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
@@ -254,7 +261,9 @@ public class WorkerConfig extends AbstractConfig {
ConfigDef.Type.STRING, "none", ConfigDef.Importance.LOW, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC)
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
HEADER_CONVERTER_CLASS_DEFAULT,
- Importance.LOW, HEADER_CONVERTER_CLASS_DOC);
+ Importance.LOW, HEADER_CONVERTER_CLASS_DOC)
+ .define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "",
+ Importance.LOW, REST_EXTENSION_CLASSES_DOC);
}
private void logInternalConverterDeprecationWarnings(Map props) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
new file mode 100644
index 0000000..a0f7fde
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.health;
+
+import org.apache.kafka.connect.health.ConnectClusterState;
+import org.apache.kafka.connect.health.ConnectorHealth;
+import org.apache.kafka.connect.health.ConnectorState;
+import org.apache.kafka.connect.health.ConnectorType;
+import org.apache.kafka.connect.health.TaskState;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.util.Callback;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ConnectClusterStateImpl implements ConnectClusterState {
+
+ private Herder herder;
+
+ public ConnectClusterStateImpl(Herder herder) {
+ this.herder = herder;
+ }
+
+ @Override
+ public Collection connectors() {
+ final Collection connectors = new ArrayList<>();
+ herder.connectors(new Callback>() {
+ @Override
+ public void onCompletion(Throwable error, Collection result) {
+ connectors.addAll(result);
+ }
+ });
+ return connectors;
+ }
+
+ @Override
+ public ConnectorHealth connectorHealth(String connName) {
+
+ ConnectorStateInfo state = herder.connectorStatus(connName);
+ ConnectorState connectorState = new ConnectorState(
+ state.connector().state(),
+ state.connector().workerId(),
+ state.connector().trace()
+ );
+ Map taskStates = taskStates(state.tasks());
+ ConnectorHealth connectorHealth = new ConnectorHealth(
+ connName,
+ connectorState,
+ taskStates,
+ ConnectorType.valueOf(state.type().name())
+ );
+ return connectorHealth;
+ }
+
+ private Map taskStates(List states) {
+
+ Map taskStates = new HashMap<>();
+
+ for (ConnectorStateInfo.TaskState state : states) {
+ taskStates.put(
+ state.id(),
+ new TaskState(state.id(), state.workerId(), state.state(), state.trace())
+ );
+ }
+ return taskStates;
+ }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index c67dfb5..b56bd1a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -16,7 +16,9 @@
*/
package org.apache.kafka.connect.runtime.isolation;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
@@ -64,6 +66,7 @@ public class DelegatingClassLoader extends URLClassLoader {
private final SortedSet> converters;
private final SortedSet> headerConverters;
private final SortedSet> transformations;
+ private final SortedSet> restExtensions;
private final List pluginPaths;
private final Map activePaths;
@@ -77,6 +80,7 @@ public class DelegatingClassLoader extends URLClassLoader {
this.converters = new TreeSet<>();
this.headerConverters = new TreeSet<>();
this.transformations = new TreeSet<>();
+ this.restExtensions = new TreeSet<>();
}
public DelegatingClassLoader(List pluginPaths) {
@@ -99,6 +103,10 @@ public class DelegatingClassLoader extends URLClassLoader {
return transformations;
}
+ public Set> restExtensions() {
+ return restExtensions;
+ }
+
public ClassLoader connectorLoader(Connector connector) {
return connectorLoader(connector.getClass().getName());
}
@@ -228,6 +236,8 @@ public class DelegatingClassLoader extends URLClassLoader {
headerConverters.addAll(plugins.headerConverters());
addPlugins(plugins.transformations(), loader);
transformations.addAll(plugins.transformations());
+ addPlugins(plugins.restExtensions(), loader);
+ restExtensions.addAll(plugins.restExtensions());
}
loadJdbcDrivers(loader);
@@ -281,7 +291,8 @@ public class DelegatingClassLoader extends URLClassLoader {
getPluginDesc(reflections, Connector.class, loader),
getPluginDesc(reflections, Converter.class, loader),
getPluginDesc(reflections, HeaderConverter.class, loader),
- getPluginDesc(reflections, Transformation.class, loader)
+ getPluginDesc(reflections, Transformation.class, loader),
+ getServiceLoaderPluginDesc(ConnectRestExtension.class, loader)
);
}
@@ -295,23 +306,29 @@ public class DelegatingClassLoader extends URLClassLoader {
Collection> result = new ArrayList<>();
for (Class extends T> plugin : plugins) {
if (PluginUtils.isConcrete(plugin)) {
- // Temporary workaround until all the plugins are versioned.
- if (Connector.class.isAssignableFrom(plugin)) {
- result.add(
- new PluginDesc<>(
- plugin,
- ((Connector) plugin.newInstance()).version(),
- loader
- )
- );
- } else {
- result.add(new PluginDesc<>(plugin, "undefined", loader));
- }
+ result.add(new PluginDesc<>(plugin, versionFor(plugin.newInstance()), loader));
+ } else {
+ log.debug("Skipping {} as it is not concrete implementation", plugin);
}
}
return result;
}
+ private Collection> getServiceLoaderPluginDesc(Class klass,
+ ClassLoader loader) {
+
+ ServiceLoader serviceLoader = ServiceLoader.load(klass, loader);
+ Collection> result = new ArrayList<>();
+ for (T impl : serviceLoader) {
+ result.add(new PluginDesc<>(klass, versionFor(impl), loader));
+ }
+ return result;
+ }
+
+ private static String versionFor(T pluginImpl) {
+ return pluginImpl instanceof Versioned ? ((Versioned) pluginImpl).version() : "undefined";
+ }
+
@Override
protected Class> loadClass(String name, boolean resolve) throws ClassNotFoundException {
if (!PluginUtils.shouldLoadInIsolation(name)) {
@@ -337,6 +354,7 @@ public class DelegatingClassLoader extends URLClassLoader {
addAliases(converters);
addAliases(headerConverters);
addAliases(transformations);
+ addAliases(restExtensions);
}
private void addAliases(Collection> plugins) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
index c680f08..6f48e56 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
@@ -28,17 +29,20 @@ public class PluginScanResult {
private final Collection> converters;
private final Collection> headerConverters;
private final Collection> transformations;
+ private final Collection> restExtensions;
public PluginScanResult(
Collection> connectors,
Collection> converters,
Collection> headerConverters,
- Collection> transformations
+ Collection> transformations,
+ Collection> restExtensions
) {
this.connectors = connectors;
this.converters = converters;
this.headerConverters = headerConverters;
this.transformations = transformations;
+ this.restExtensions = restExtensions;
}
public Collection> connectors() {
@@ -57,10 +61,15 @@ public class PluginScanResult {
return transformations;
}
+ public Collection> restExtensions() {
+ return restExtensions;
+ }
+
public boolean isEmpty() {
return connectors().isEmpty()
&& converters().isEmpty()
&& headerConverters().isEmpty()
- && transformations().isEmpty();
+ && transformations().isEmpty()
+ && restExtensions().isEmpty();
}
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
index 5649213..918f9d7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.Converter;
@@ -30,6 +31,7 @@ public enum PluginType {
CONNECTOR(Connector.class),
CONVERTER(Converter.class),
TRANSFORMATION(Transformation.class),
+ REST_EXTENSION(ConnectRestExtension.class),
UNKNOWN(Object.class);
private Class> klass;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 30c41cd..9607410 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -16,9 +16,11 @@
*/
package org.apache.kafka.connect.runtime.isolation;
+import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.Task;
@@ -316,6 +318,53 @@ public class Plugins {
return plugin;
}
+
+ /**
+ * If the given class names are available in the classloader, return a list of new configured
+ * instances. If the instances implement {@link Configurable}, they are configured with provided {@param config}
+ *
+ * @param klassNames the list of class names of plugins that needs to instantiated and configured
+ * @param config the configuration containing the {@link org.apache.kafka.connect.runtime.Worker}'s configuration; may not be {@code null}
+ * @param pluginKlass the type of the plugin class that is being instantiated
+ * @return the instantiated and configured list of plugins of type ; empty list if the {@param klassNames} is {@code null} or empty
+ * @throws ConnectException if the implementation class could not be found
+ */
+ public List newPlugins(List klassNames, AbstractConfig config, Class pluginKlass) {
+ List plugins = new ArrayList<>();
+ if (klassNames != null) {
+ for (String klassName : klassNames) {
+ plugins.add(newPlugin(klassName, config, pluginKlass));
+ }
+ }
+ return plugins;
+ }
+
+ public T newPlugin(String klassName, AbstractConfig config, Class pluginKlass) {
+ T plugin;
+ Class extends T> klass;
+ try {
+ klass = pluginClass(delegatingLoader, klassName, pluginKlass);
+ } catch (ClassNotFoundException e) {
+ String msg = String.format("Failed to find any class that implements %s and which "
+ + "name matches %s", pluginKlass, klassName);
+ throw new ConnectException(msg);
+ }
+ plugin = newPlugin(klass);
+ if (plugin == null) {
+ throw new ConnectException("Unable to instantiate '" + klassName + "'");
+ }
+ if (plugin instanceof Versioned) {
+ Versioned versionedPlugin = (Versioned) plugin;
+ if (versionedPlugin.version() == null || versionedPlugin.version().trim().isEmpty()) {
+ throw new ConnectException("Version not defined for '" + klassName + "'");
+ }
+ }
+ if (plugin instanceof Configurable) {
+ ((Configurable) plugin).configure(config.originals());
+ }
+ return plugin;
+ }
+
/**
* Get an instance of the give class specified by the given configuration key.
*
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java
new file mode 100644
index 0000000..c9c2c3b
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import org.glassfish.jersey.server.ResourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+
+import javax.ws.rs.core.Configurable;
+import javax.ws.rs.core.Configuration;
+
+/**
+ * The implementation delegates to {@link ResourceConfig} so that we can handle duplicate
+ * registrations deterministically by not re-registering them again.
+ */
+public class ConnectRestConfigurable implements Configurable {
+
+ private static final Logger log = LoggerFactory.getLogger(ConnectRestConfigurable.class);
+
+ private static final boolean ALLOWED_TO_REGISTER = true;
+ private static final boolean NOT_ALLOWED_TO_REGISTER = false;
+
+ private ResourceConfig resourceConfig;
+
+ public ConnectRestConfigurable(ResourceConfig resourceConfig) {
+ Objects.requireNonNull(resourceConfig, "ResourceConfig can't be null");
+ this.resourceConfig = resourceConfig;
+ }
+
+
+ @Override
+ public Configuration getConfiguration() {
+ return resourceConfig.getConfiguration();
+ }
+
+ @Override
+ public ResourceConfig property(String name, Object value) {
+ return resourceConfig.property(name, value);
+ }
+
+ @Override
+ public ResourceConfig register(Object component) {
+ if (allowedToRegister(component)) {
+ resourceConfig.register(component);
+ }
+ return resourceConfig;
+ }
+
+ @Override
+ public ResourceConfig register(Object component, int priority) {
+ if (allowedToRegister(component)) {
+ resourceConfig.register(component, priority);
+ }
+ return resourceConfig;
+ }
+
+ @Override
+ public ResourceConfig register(Object component, Map contracts) {
+ if (allowedToRegister(component)) {
+ resourceConfig.register(component, contracts);
+ }
+ return resourceConfig;
+ }
+
+ @Override
+ public ResourceConfig register(Object component, Class[] contracts) {
+ if (allowedToRegister(component)) {
+ resourceConfig.register(component, contracts);
+ }
+ return resourceConfig;
+ }
+
+ @Override
+ public ResourceConfig register(Class componentClass, Map contracts) {
+ if (allowedToRegister(componentClass)) {
+ resourceConfig.register(componentClass, contracts);
+ }
+ return resourceConfig;
+ }
+
+ @Override
+ public ResourceConfig register(Class componentClass, Class[] contracts) {
+ if (allowedToRegister(componentClass)) {
+ resourceConfig.register(componentClass, contracts);
+ }
+ return resourceConfig;
+ }
+
+ @Override
+ public ResourceConfig register(Class componentClass, int priority) {
+ if (allowedToRegister(componentClass)) {
+ resourceConfig.register(componentClass, priority);
+ }
+ return resourceConfig;
+ }
+
+ @Override
+ public ResourceConfig register(Class componentClass) {
+ if (allowedToRegister(componentClass)) {
+ resourceConfig.register(componentClass);
+ }
+ return resourceConfig;
+ }
+
+ private boolean allowedToRegister(Object component) {
+ if (resourceConfig.isRegistered(component)) {
+ log.warn("The resource {} is already registered", component);
+ return NOT_ALLOWED_TO_REGISTER;
+ }
+ return ALLOWED_TO_REGISTER;
+ }
+
+ private boolean allowedToRegister(Class componentClass) {
+ if (resourceConfig.isRegistered(componentClass)) {
+ log.warn("The resource {} is already registered", componentClass);
+ return NOT_ALLOWED_TO_REGISTER;
+ }
+ return ALLOWED_TO_REGISTER;
+ }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java
new file mode 100644
index 0000000..cdf282f
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import org.apache.kafka.connect.health.ConnectClusterState;
+import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
+
+import javax.ws.rs.core.Configurable;
+
+public class ConnectRestExtensionContextImpl implements ConnectRestExtensionContext {
+
+ private Configurable extends Configurable> configurable;
+ private ConnectClusterState clusterState;
+
+ public ConnectRestExtensionContextImpl(
+ Configurable extends Configurable> configurable,
+ ConnectClusterState clusterState
+ ) {
+ this.configurable = configurable;
+ this.clusterState = clusterState;
+ }
+
+ @Override
+ public Configurable extends Configurable> configurable() {
+ return configurable;
+ }
+
+ @Override
+ public ConnectClusterState clusterState() {
+ return clusterState;
+ }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 8d7803e..5a589db 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -17,10 +17,14 @@
package org.apache.kafka.connect.runtime.rest;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
+import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
@@ -45,8 +49,7 @@ import org.glassfish.jersey.servlet.ServletContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.servlet.DispatcherType;
-import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
@@ -56,6 +59,9 @@ import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import javax.servlet.DispatcherType;
+import javax.ws.rs.core.UriBuilder;
+
/**
* Embedded server for the REST API that provides the control plane for Kafka Connect workers.
*/
@@ -71,6 +77,8 @@ public class RestServer {
private final WorkerConfig config;
private Server jettyServer;
+ private List connectRestExtensions = Collections.EMPTY_LIST;
+
/**
* Create a REST server for this herder using the specified configs.
*/
@@ -163,6 +171,8 @@ public class RestServer {
resourceConfig.register(ConnectExceptionMapper.class);
+ registerRestExtensions(herder, resourceConfig);
+
ServletContainer servletContainer = new ServletContainer(resourceConfig);
ServletHolder servletHolder = new ServletHolder(servletContainer);
@@ -207,10 +217,19 @@ public class RestServer {
log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl());
}
+
+
public void stop() {
log.info("Stopping REST server");
try {
+ for (ConnectRestExtension connectRestExtension : connectRestExtensions) {
+ try {
+ connectRestExtension.close();
+ } catch (IOException e) {
+ log.warn("Error while invoking close on " + connectRestExtension.getClass(), e);
+ }
+ }
jettyServer.stop();
jettyServer.join();
} catch (Exception e) {
@@ -280,6 +299,22 @@ public class RestServer {
return null;
}
+ void registerRestExtensions(Herder herder, ResourceConfig resourceConfig) {
+ connectRestExtensions = herder.plugins().newPlugins(
+ config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
+ config, ConnectRestExtension.class);
+
+ ConnectRestExtensionContext connectRestExtensionContext =
+ new ConnectRestExtensionContextImpl(
+ new ConnectRestConfigurable(resourceConfig),
+ new ConnectClusterStateImpl(herder)
+ );
+ for (ConnectRestExtension connectRestExtension : connectRestExtensions) {
+ connectRestExtension.register(connectRestExtensionContext);
+ }
+
+ }
+
public static String urlJoin(String base, String path) {
if (base.endsWith("/") && path.startsWith("/"))
return base + path.substring(1);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 25c2cb1..ca26e4e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -71,7 +71,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-@PowerMockIgnore("javax.management.*")
+@PowerMockIgnore({"javax.management.*",
+ "org.apache.kafka.connect.runtime.isolation.*"})
@RunWith(PowerMockRunner.class)
public class WorkerSourceTaskTest extends ThreadedTest {
private static final String TOPIC = "topic";
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 877fe6b..5c8aa29 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
+import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
import org.apache.kafka.connect.storage.Converter;
@@ -37,6 +39,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -142,6 +145,26 @@ public class PluginsTest {
}
@Test
+ public void shouldInstantiateAndConfigureConnectRestExtension() {
+ props.put(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG,
+ TestConnectRestExtension.class.getName());
+ createConfig();
+
+ List connectRestExtensions =
+ plugins.newPlugins(config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
+ config,
+ ConnectRestExtension.class);
+ assertNotNull(connectRestExtensions);
+ assertEquals("One Rest Extension expected", 1, connectRestExtensions.size());
+ assertNotNull(connectRestExtensions.get(0));
+ assertTrue("Should be instance of TestConnectRestExtension",
+ connectRestExtensions.get(0) instanceof TestConnectRestExtension);
+ assertNotNull(((TestConnectRestExtension) connectRestExtensions.get(0)).configs);
+ assertEquals(config.originals(),
+ ((TestConnectRestExtension) connectRestExtensions.get(0)).configs);
+ }
+
+ @Test
public void shouldInstantiateAndConfigureDefaultHeaderConverter() {
props.remove(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG);
createConfig();
@@ -243,6 +266,30 @@ public class PluginsTest {
}
}
+
+ public static class TestConnectRestExtension implements ConnectRestExtension {
+
+ public Map configs;
+
+ @Override
+ public void register(ConnectRestExtensionContext restPluginContext) {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void configure(Map configs) {
+ this.configs = configs;
+ }
+
+ @Override
+ public String version() {
+ return "test";
+ }
+ }
+
public static class TestInternalConverter extends JsonConverter {
public Map configs;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index d26aa04..2f8704a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -17,9 +17,11 @@
package org.apache.kafka.connect.runtime.rest;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.util.Callback;
import org.easymock.Capture;
import org.easymock.EasyMock;
@@ -32,18 +34,20 @@ import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.modules.junit4.PowerMockRunner;
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Invocation;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.Response;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response;
+
import static org.junit.Assert.assertEquals;
@RunWith(PowerMockRunner.class)
@@ -51,6 +55,8 @@ public class RestServerTest {
@MockStrict
private Herder herder;
+ @MockStrict
+ private Plugins plugins;
private RestServer server;
@After
@@ -151,8 +157,19 @@ public class RestServerTest {
public void checkCORSRequest(String corsDomain, String origin, String expectedHeader, String method) {
// To be able to set the Origin, we need to toggle this flag
+
+ Map workerProps = baseWorkerProps();
+ workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain);
+ workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method);
+ WorkerConfig workerConfig = new DistributedConfig(workerProps);
System.setProperty("sun.net.http.allowRestrictedHeaders", "true");
+ EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+ EasyMock.expect(plugins.newPlugins(Collections.EMPTY_LIST,
+ workerConfig,
+ ConnectRestExtension.class))
+ .andStubReturn(Collections.EMPTY_LIST);
+
final Capture>> connectorsCallback = EasyMock.newCapture();
herder.connectors(EasyMock.capture(connectorsCallback));
PowerMock.expectLastCall().andAnswer(new IAnswer