From commits-return-9584-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Wed May 30 06:36:15 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id E5D4618063B for ; Wed, 30 May 2018 06:36:12 +0200 (CEST) Received: (qmail 32108 invoked by uid 500); 30 May 2018 04:36:11 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 32099 invoked by uid 99); 30 May 2018 04:36:11 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 May 2018 04:36:11 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id BF1618081F; Wed, 30 May 2018 04:36:10 +0000 (UTC) Date: Wed, 30 May 2018 04:36:10 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: KAFKA-6776: ConnectRestExtension Interfaces & Rest integration (KIP-285) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152765497000.11794.14412370610330420440@gitbox.apache.org> From: ewencp@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: fffb9c5b5cac1a669f22dd99860774d6c0fdb94b X-Git-Newrev: 98094954a27849b027831a7401e965c6a949790c X-Git-Rev: 98094954a27849b027831a7401e965c6a949790c X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 9809495 KAFKA-6776: ConnectRestExtension Interfaces & Rest integration (KIP-285) 9809495 is described below commit 98094954a27849b027831a7401e965c6a949790c Author: Magesh Nandakumar AuthorDate: Tue May 29 21:35:22 2018 -0700 KAFKA-6776: ConnectRestExtension Interfaces & Rest integration (KIP-285) This PR provides the implementation for KIP-285 and also a reference implementation for authenticating BasicAuth credentials using JAAS LoginModule Author: Magesh Nandakumar Reviewers: Randall Hauch , Arjun Satish , Konstantine Karantasis , Ewen Cheslack-Postava Closes #4931 from mageshn/KIP-285 --- build.gradle | 40 +++++ checkstyle/import-control.xml | 14 +- .../apache/kafka/connect/components/Versioned.java | 30 ++++ .../apache/kafka/connect/connector/Connector.java | 9 +- .../apache/kafka/connect/health/AbstractState.java | 74 +++++++++ .../kafka/connect/health/ConnectClusterState.java | 46 ++++++ .../kafka/connect/health/ConnectorHealth.java | 86 +++++++++++ .../kafka/connect/health/ConnectorState.java | 35 +++++ .../apache/kafka/connect/health/ConnectorType.java | 43 ++++++ .../org/apache/kafka/connect/health/TaskState.java | 69 +++++++++ .../kafka/connect/rest/ConnectRestExtension.java | 58 +++++++ .../connect/rest/ConnectRestExtensionContext.java | 44 ++++++ .../extenstion/BasicAuthSecurityRestExtension.java | 79 ++++++++++ .../basic/auth/extenstion/JaasBasicAuthFilter.java | 100 ++++++++++++ .../auth/extenstion/PropertyFileLoginModule.java | 116 ++++++++++++++ ....apache.kafka.connect.rest.ConnectRestExtension | 16 ++ .../auth/extenstion/JaasBasicAuthFilterTest.java | 168 +++++++++++++++++++++ .../apache/kafka/connect/runtime/WorkerConfig.java | 11 +- .../runtime/health/ConnectClusterStateImpl.java | 86 +++++++++++ .../runtime/isolation/DelegatingClassLoader.java | 44 ++++-- .../runtime/isolation/PluginScanResult.java | 13 +- .../connect/runtime/isolation/PluginType.java | 2 + .../kafka/connect/runtime/isolation/Plugins.java | 49 ++++++ .../runtime/rest/ConnectRestConfigurable.java | 138 +++++++++++++++++ .../rest/ConnectRestExtensionContextImpl.java | 47 ++++++ .../kafka/connect/runtime/rest/RestServer.java | 39 ++++- .../connect/runtime/WorkerSourceTaskTest.java | 3 +- .../connect/runtime/isolation/PluginsTest.java | 47 ++++++ .../kafka/connect/runtime/rest/RestServerTest.java | 32 ++-- ....apache.kafka.connect.rest.ConnectRestExtension | 16 ++ settings.gradle | 2 +- 31 files changed, 1519 insertions(+), 37 deletions(-) diff --git a/build.gradle b/build.gradle index 474329a..4f3fd77 100644 --- a/build.gradle +++ b/build.gradle @@ -1207,6 +1207,7 @@ project(':connect:api') { dependencies { compile project(':clients') compile libs.slf4jApi + compile libs.jerseyContainerServlet testCompile libs.junit @@ -1431,6 +1432,45 @@ project(':connect:file') { } } +project(':connect:basic-auth-extension') { + archivesBaseName = "connect-basic-auth-extension" + + dependencies { + compile project(':connect:api') + compile libs.slf4jApi + + testCompile libs.bcpkix + testCompile libs.easymock + testCompile libs.junit + testCompile libs.powermockJunit4 + testCompile libs.powermockEasymock + testCompile project(':clients').sourceSets.test.output + + testRuntime libs.slf4jlog4j + } + + javadoc { + enabled = false + } + + tasks.create(name: "copyDependantLibs", type: Copy) { + from (configurations.testRuntime) { + include('slf4j-log4j12*') + include('log4j*jar') + } + from (configurations.runtime) { + exclude('kafka-clients*') + exclude('connect-*') + } + into "$buildDir/dependant-libs" + duplicatesStrategy 'exclude' + } + + jar { + dependsOn copyDependantLibs + } +} + task aggregatedJavadoc(type: Javadoc) { def projectsWithJavadoc = subprojects.findAll { it.javadoc.enabled } source = projectsWithJavadoc.collect { it.sourceSets.main.allJava } diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index e4f9a4e..5549205 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -290,6 +290,7 @@ + @@ -307,7 +308,16 @@ - + + + + + + + + + + @@ -327,6 +337,8 @@ + + diff --git a/connect/api/src/main/java/org/apache/kafka/connect/components/Versioned.java b/connect/api/src/main/java/org/apache/kafka/connect/components/Versioned.java new file mode 100644 index 0000000..adabe8f --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/components/Versioned.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.components; + +/** + * Connect requires some components implement this interface to define a version string. + */ +public interface Versioned { + /** + * Get the version of this component. + * + * @return the version, formatted as a String. The version may not be (@code null} or empty. + */ + String version(); +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java index 30dfd3c..329429b 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.components.Versioned; import java.util.List; import java.util.Map; @@ -41,16 +42,10 @@ import java.util.Map; * Tasks. *

*/ -public abstract class Connector { +public abstract class Connector implements Versioned { protected ConnectorContext context; - /** - * Get the version of this connector. - * - * @return the version, formatted as a String - */ - public abstract String version(); /** * Initialize this connector, using the provided ConnectorContext to notify the runtime of diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java new file mode 100644 index 0000000..a18c463 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.health; + +/** + * Provides the current status along with identifier for Connect worker and tasks. + */ +public abstract class AbstractState { + + private final String state; + private final String traceMessage; + private final String workerId; + + /** + * Construct a state for connector or task. + * + * @param state the status of connector or task; may not be null or empty + * @param workerId the workerId associated with the connector or the task; may not be null or empty + * @param traceMessage any error trace message associated with the connector or the task; may be null or empty + */ + public AbstractState(String state, String workerId, String traceMessage) { + if (state != null && !state.trim().isEmpty()) { + throw new IllegalArgumentException("State must not be null or empty"); + } + if (workerId != null && !workerId.trim().isEmpty()) { + throw new IllegalArgumentException("Worker ID must not be null or empty"); + } + this.state = state; + this.workerId = workerId; + this.traceMessage = traceMessage; + } + + /** + * Provides the current state of the connector or task. + * + * @return state, never {@code null} or empty + */ + public String state() { + return state; + } + + /** + * The identifier of the worker associated with the connector or the task. + * + * @return workerId, never {@code null} or empty. + */ + public String workerId() { + return workerId; + } + + /** + * The error message associated with the connector or task. + * + * @return traceMessage, can be {@code null} or empty. + */ + public String traceMessage() { + return traceMessage; + } +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java new file mode 100644 index 0000000..d4292ef --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.health; + +import java.util.Collection; + +/** + * Provides the ability to lookup connector metadata and its health. This is made available to the {@link org.apache.kafka.connect.rest.ConnectRestExtension} + * implementations. The Connect framework provides the implementation for this interface. + */ +public interface ConnectClusterState { + + /** + * Get the names of the connectors currently deployed in this cluster. This is a full list of connectors in the cluster gathered from + * the current configuration, which may change over time. + * + * @return collection of connector names, never {@code null} + */ + Collection connectors(); + + /** + * Lookup the current health of a connector and its tasks. This provides the current snapshot of health by querying the underlying + * herder. A connector returned by previous invocation of {@link #connectors()} may no longer be available and could result in {@link + * org.apache.kafka.connect.errors.NotFoundException}. + * + * @param connName name of the connector + * @return the health of the connector for the connector name + * @throws org.apache.kafka.connect.errors.NotFoundException if the requested connector can't be found + */ + ConnectorHealth connectorHealth(String connName); +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java new file mode 100644 index 0000000..3a9efd1 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.health; + + +import java.util.Map; +import java.util.Objects; + +/** + * Provides basic health information about the connector and its tasks. + */ +public class ConnectorHealth { + + private final String name; + private final ConnectorState connectorState; + private final Map tasks; + private final ConnectorType type; + + + public ConnectorHealth(String name, + ConnectorState connectorState, + Map tasks, + ConnectorType type) { + if (name != null && !name.trim().isEmpty()) { + throw new IllegalArgumentException("Connector name is required"); + } + Objects.requireNonNull(connectorState, "connectorState can't be null"); + Objects.requireNonNull(tasks, "tasks can't be null"); + Objects.requireNonNull(type, "type can't be null"); + this.name = name; + this.connectorState = connectorState; + this.tasks = tasks; + this.type = type; + } + + /** + * Provides the name of the connector. + * + * @return name, never {@code null} or empty + */ + public String name() { + return name; + } + + /** + * Provides the current state of the connector. + * + * @return the connector state, never {@code null} + */ + public ConnectorState connectorState() { + return connectorState; + } + + /** + * Provides the current state of the connector tasks. + * + * @return the state for each task ID; never {@code null} + */ + public Map tasksState() { + return tasks; + } + + /** + * Provides the type of the connector. + * + * @return type, never {@code null} + */ + public ConnectorType type() { + return type; + } + +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java new file mode 100644 index 0000000..d5571bc --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.health; + +/** + * Describes the status, worker ID, and any errors associated with a connector. + */ +public class ConnectorState extends AbstractState { + + /** + * Provides an instance of the ConnectorState. + * + * @param state - the status of connector, may not be {@code null} or empty + * @param workerId - the workerId associated with the connector, may not be {@code null} or empty + * @param traceMessage - any error message associated with the connector, may be {@code null} or empty + */ + public ConnectorState(String state, String workerId, String traceMessage) { + super(state, workerId, traceMessage); + } +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorType.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorType.java new file mode 100644 index 0000000..fa9db6f --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorType.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.health; + +import java.util.Locale; + +/** + * Enum definition that identifies the type of the connector. + */ +public enum ConnectorType { + /** + * Identifies a source connector + */ + SOURCE, + /** + * Identifies a sink connector + */ + SINK, + /** + * Identifies a connector whose type could not be inferred + */ + UNKNOWN; + + @Override + public String toString() { + return super.toString().toLowerCase(Locale.ROOT); + } + +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java new file mode 100644 index 0000000..1c1be15 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.health; + +import java.util.Objects; + +/** + * Describes the state, IDs, and any errors of a connector task. + */ +public class TaskState extends AbstractState { + + private final int taskId; + + /** + * Provides an instance of {@link TaskState}. + * + * @param taskId the id associated with the connector task + * @param state the status of the task, may not be {@code null} or empty + * @param workerId id of the worker the task is associated with, may not be {@code null} or empty + * @param trace error message if that task had failed or errored out, may be {@code null} or empty + */ + public TaskState(int taskId, String state, String workerId, String trace) { + super(state, workerId, trace); + this.taskId = taskId; + } + + /** + * Provides the ID of the task. + * + * @return the task ID + */ + public int taskId() { + return taskId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TaskState taskState = (TaskState) o; + + return taskId == taskState.taskId; + } + + @Override + public int hashCode() { + return Objects.hash(taskId); + } +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java new file mode 100644 index 0000000..aa479a3 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.rest; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.connect.components.Versioned; +import org.apache.kafka.connect.health.ConnectClusterState; + +import java.io.Closeable; +import java.util.Map; + +/** + * A plugin interface to allow registration of new JAX-RS resources like Filters, REST endpoints, providers, etc. The implementations will + * be discovered using the standard Java {@link java.util.ServiceLoader} mechanism by Connect's plugin class loading mechanism. + * + *

The extension class(es) must be packaged as a plugin, with one JAR containing the implementation classes and a {@code + * META-INF/services/org.apache.kafka.connect.rest.extension.ConnectRestExtension} file that contains the fully qualified name of the + * class(es) that implement the ConnectRestExtension interface. The plugin should also include the JARs of all dependencies except those + * already provided by the Connect framework. + * + *

To install into a Connect installation, add a directory named for the plugin and containing the plugin's JARs into a directory that is + * on Connect's {@code plugin.path}, and (re)start the Connect worker. + * + *

When the Connect worker process starts up, it will read its configuration and instantiate all of the REST extension implementation + * classes that are specified in the `rest.extension.classes` configuration property. Connect will then pass its configuration to each + * extension via the {@link Configurable#configure(Map)} method, and will then call {@link #register} with a provided context. + * + *

When the Connect worker shuts down, it will call the extension's {@link #close} method to allow the implementation to release all of + * its resources. + */ +public interface ConnectRestExtension extends Configurable, Versioned, Closeable { + + /** + * ConnectRestExtension implementations can register custom JAX-RS resources via the {@link #register(ConnectRestExtensionContext)} + * method. The Connect framework will invoke this method after registering the default Connect resources. If the implementations attempt + * to re-register any of the Connect resources, it will be be ignored and will be logged. + * + * @param restPluginContext The context provides access to JAX-RS {@link javax.ws.rs.core.Configurable} and {@link + * ConnectClusterState}.The custom JAX-RS resources can be registered via the {@link + * ConnectRestExtensionContext#configurable()} + */ + void register(ConnectRestExtensionContext restPluginContext); +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java new file mode 100644 index 0000000..7608597 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.rest; + +import org.apache.kafka.connect.health.ConnectClusterState; + +import javax.ws.rs.core.Configurable; + +/** + * The interface provides the ability for {@link ConnectRestExtension} implementations to access the JAX-RS + * {@link javax.ws.rs.core.Configurable} and cluster state {@link ConnectClusterState}. The implementation for the interface is provided + * by the Connect framework. + */ +public interface ConnectRestExtensionContext { + + /** + * Provides an implementation of {@link javax.ws.rs.core.Configurable} that be used to register JAX-RS resources. + * + * @return @return the JAX-RS {@link javax.ws.rs.core.Configurable}; never {@code null} + */ + Configurable configurable(); + + /** + * Provides the cluster state and health information about the connectors and tasks. + * + * @return the cluster state information; never {@code null} + */ + ConnectClusterState clusterState(); +} diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java new file mode 100644 index 0000000..91d5d9c --- /dev/null +++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.rest.basic.auth.extenstion; + +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.connect.rest.ConnectRestExtension; +import org.apache.kafka.connect.rest.ConnectRestExtensionContext; + +import java.io.IOException; +import java.util.Map; + +/** + * Provides the ability to authenticate incoming BasicAuth credentials using the configured JAAS {@link + * javax.security.auth.spi.LoginModule}. An entry with the name {@code KafkaConnect} is expected in the JAAS config file configured in the + * JVM. An implementation of {@link javax.security.auth.spi.LoginModule} needs to be provided in the JAAS config file. The {@code + * LoginModule} implementation should configure the {@link javax.security.auth.callback.CallbackHandler} with only {@link + * javax.security.auth.callback.NameCallback} and {@link javax.security.auth.callback.PasswordCallback}. + * + *

To use this extension, one needs to add the following config in the {@code worker.properties} + *

+ *     rest.extension.classes = org.apache.kafka.connect.rest.basic.auth.extenstion.BasicAuthSecurityRestExtension
+ * 
+ * + *

An example JAAS config would look as below + *

+ *         KafkaConnect {
+ *              org.apache.kafka.connect.rest.basic.auth.extenstion.PropertyFileLoginModule required
+ *              file="/mnt/secret/credentials.properties";
+ *         };
+ *
+ * + *

This is a reference implementation of the {@link ConnectRestExtension} interface. It registers an implementation of {@link + * javax.ws.rs.container.ContainerRequestFilter} that does JAAS based authentication of incoming Basic Auth credentials. {@link + * ConnectRestExtension} implementations are loaded via the plugin class loader using {@link java.util.ServiceLoader} mechanism and hence + * the packaged jar includes {@code META-INF/services/org.apache.kafka.connect.rest.extension.ConnectRestExtension} with the entry + * {@code org.apache.kafka.connect.extension.auth.jaas.BasicAuthSecurityRestExtension} + * + *

NOTE: The implementation ships with a default {@link PropertyFileLoginModule} that helps authenticate the request against a + * property file. {@link PropertyFileLoginModule} is NOT intended to be used in production since the credentials are stored in PLAINTEXT. One can use + * this extension in production by using their own implementation of {@link javax.security.auth.spi.LoginModule} that authenticates against + * stores like LDAP, DB, etc. + */ +public class BasicAuthSecurityRestExtension implements ConnectRestExtension { + + @Override + public void register(ConnectRestExtensionContext restPluginContext) { + restPluginContext.configurable().register(JaasBasicAuthFilter.class); + } + + @Override + public void close() throws IOException { + + } + + @Override + public void configure(Map configs) { + + } + + @Override + public String version() { + return AppInfoParser.getVersion(); + } +} diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java new file mode 100644 index 0000000..7231700 --- /dev/null +++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.rest.basic.auth.extenstion; + +import org.apache.kafka.common.config.ConfigException; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.LoginContext; +import javax.security.auth.login.LoginException; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.container.ContainerRequestFilter; +import javax.ws.rs.core.Response; + +public class JaasBasicAuthFilter implements ContainerRequestFilter { + + private static final String CONNECT_LOGIN_MODULE = "KafkaConnect"; + static final String AUTHORIZATION = "Authorization"; + + @Override + public void filter(ContainerRequestContext requestContext) throws IOException { + + try { + LoginContext loginContext = + new LoginContext(CONNECT_LOGIN_MODULE, new BasicAuthCallBackHandler( + requestContext.getHeaderString(AUTHORIZATION))); + loginContext.login(); + } catch (LoginException | ConfigException e) { + requestContext.abortWith( + Response.status(Response.Status.UNAUTHORIZED) + .entity("User cannot access the resource.") + .build()); + } + } + + + public static class BasicAuthCallBackHandler implements CallbackHandler { + + private static final String BASIC = "basic"; + private static final char COLON = ':'; + private static final char SPACE = ' '; + private String username; + private String password; + + public BasicAuthCallBackHandler(String credentials) { + if (credentials != null) { + int space = credentials.indexOf(SPACE); + if (space > 0) { + String method = credentials.substring(0, space); + if (BASIC.equalsIgnoreCase(method)) { + credentials = credentials.substring(space + 1); + credentials = new String(Base64.getDecoder().decode(credentials), + StandardCharsets.UTF_8); + int i = credentials.indexOf(COLON); + if (i > 0) { + username = credentials.substring(0, i); + password = credentials.substring(i + 1); + } + } + } + } + } + + @Override + public void handle(Callback[] callbacks) throws UnsupportedCallbackException { + for (Callback callback : callbacks) { + if (callback instanceof NameCallback) { + ((NameCallback) callback).setName(username); + } else if (callback instanceof PasswordCallback) { + ((PasswordCallback) callback).setPassword(password.toCharArray()); + } else { + throw new UnsupportedCallbackException(callback, "Supports only NameCallback " + + "and PasswordCallback"); + } + } + } + } +} diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java new file mode 100644 index 0000000..7af7863 --- /dev/null +++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.rest.basic.auth.extenstion; + +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import javax.security.auth.Subject; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.login.LoginException; +import javax.security.auth.spi.LoginModule; + +/** + * {@link PropertyFileLoginModule} authenticates against a properties file. + * The credentials should be stored in the format {username}={password} in the properties file. + * The absolute path of the file needs to specified using the option file + * + *

NOTE: This implementation is NOT intended to be used in production since the credentials are stored in PLAINTEXT in the + * properties file. + */ +public class PropertyFileLoginModule implements LoginModule { + private static final Logger log = LoggerFactory.getLogger(PropertyFileLoginModule.class); + + private CallbackHandler callbackHandler; + private static final String FILE_OPTIONS = "file"; + private String fileName; + private boolean authenticated; + + private static Map credentialPropertiesMap = new ConcurrentHashMap<>(); + + @Override + public void initialize(Subject subject, CallbackHandler callbackHandler, Map sharedState, Map options) { + this.callbackHandler = callbackHandler; + fileName = (String) options.get(FILE_OPTIONS); + if (fileName == null || fileName.trim().isEmpty()) { + throw new ConfigException("Property Credentials file must be specified"); + } + if (!credentialPropertiesMap.containsKey(fileName)) { + Properties credentialProperties = new Properties(); + try { + credentialProperties.load(Files.newInputStream(Paths.get(fileName))); + credentialPropertiesMap.putIfAbsent(fileName, credentialProperties); + } catch (IOException e) { + log.error("Error loading credentials file ", e); + throw new ConfigException("Error loading Property Credentials file"); + } + } + } + + @Override + public boolean login() throws LoginException { + Callback[] callbacks = configureCallbacks(); + try { + callbackHandler.handle(callbacks); + } catch (Exception e) { + throw new LoginException(e.getMessage()); + } + + String username = ((NameCallback) callbacks[0]).getName(); + char[] passwordChars = ((PasswordCallback) callbacks[1]).getPassword(); + String password = passwordChars != null ? new String(passwordChars) : null; + Properties credentialProperties = credentialPropertiesMap.get(fileName); + authenticated = credentialProperties.isEmpty() || + (password != null && password.equals(credentialProperties.get(username))); + return authenticated; + } + + @Override + public boolean commit() throws LoginException { + return authenticated; + } + + @Override + public boolean abort() throws LoginException { + return true; + } + + @Override + public boolean logout() throws LoginException { + return true; + } + + private Callback[] configureCallbacks() { + + Callback[] callbacks = new Callback[2]; + callbacks[0] = new NameCallback("Enter user name"); + callbacks[1] = new PasswordCallback("Enter password", false); + return callbacks; + } +} diff --git a/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension b/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension new file mode 100644 index 0000000..098c947 --- /dev/null +++ b/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.rest.basic.auth.extenstion.BasicAuthSecurityRestExtension \ No newline at end of file diff --git a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java new file mode 100644 index 0000000..80299f8 --- /dev/null +++ b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.rest.basic.auth.extenstion; + +import org.apache.kafka.common.security.JaasUtils; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.annotation.MockStrict; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import javax.security.auth.login.Configuration; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.core.Response; + +import static org.powermock.api.easymock.PowerMock.replayAll; + +@RunWith(PowerMockRunner.class) +@PowerMockIgnore("javax.*") +public class JaasBasicAuthFilterTest { + + @MockStrict + private ContainerRequestContext requestContext; + + private JaasBasicAuthFilter jaasBasicAuthFilter = new JaasBasicAuthFilter(); + private String previousJaasConfig; + private Configuration previousConfiguration; + + @Before + public void setup() throws IOException { + EasyMock.reset(requestContext); + } + + @After + public void tearDown() { + if (previousJaasConfig != null) { + System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, previousJaasConfig); + } + Configuration.setConfiguration(previousConfiguration); + } + + @Test + public void testSuccess() throws IOException { + File credentialFile = File.createTempFile("credential", ".properties"); + credentialFile.deleteOnExit(); + List lines = new ArrayList<>(); + lines.add("user=password"); + lines.add("user1=password1"); + Files.write(credentialFile.toPath(), lines, StandardCharsets.UTF_8); + + setupJaasConfig("KafkaConnect", credentialFile.getPath(), true); + setMock("Basic", "user", "password", false); + + jaasBasicAuthFilter.filter(requestContext); + } + + + @Test + public void testBadCredential() throws IOException { + setMock("Basic", "user1", "password", true); + jaasBasicAuthFilter.filter(requestContext); + } + + @Test + public void testBadPassword() throws IOException { + setMock("Basic", "user", "password1", true); + jaasBasicAuthFilter.filter(requestContext); + } + + @Test + public void testUnknownBearer() throws IOException { + setMock("Unknown", "user", "password", true); + jaasBasicAuthFilter.filter(requestContext); + } + + @Test + public void testUnknownLoginModule() throws IOException { + setupJaasConfig("KafkaConnect1", "/tmp/testcrednetial", true); + Configuration.setConfiguration(null); + setMock("Basic", "user", "password", true); + jaasBasicAuthFilter.filter(requestContext); + } + + @Test + public void testUnknownCredentialsFile() throws IOException { + setupJaasConfig("KafkaConnect", "/tmp/testcrednetial", true); + Configuration.setConfiguration(null); + setMock("Basic", "user", "password", true); + jaasBasicAuthFilter.filter(requestContext); + } + + @Test + public void testEmptyCredentialsFile() throws IOException { + File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf"); + jaasConfigFile.deleteOnExit(); + System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath()); + setupJaasConfig("KafkaConnect", "", true); + Configuration.setConfiguration(null); + setMock("Basic", "user", "password", true); + jaasBasicAuthFilter.filter(requestContext); + } + + @Test + public void testNoFileOption() throws IOException { + File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf"); + jaasConfigFile.deleteOnExit(); + System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath()); + setupJaasConfig("KafkaConnect", "", false); + Configuration.setConfiguration(null); + setMock("Basic", "user", "password", true); + jaasBasicAuthFilter.filter(requestContext); + } + + private void setMock(String authorization, String username, String password, boolean exceptionCase) { + String authHeader = authorization + " " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes()); + EasyMock.expect(requestContext.getHeaderString(JaasBasicAuthFilter.AUTHORIZATION)) + .andReturn(authHeader); + if (exceptionCase) { + requestContext.abortWith(EasyMock.anyObject(Response.class)); + EasyMock.expectLastCall(); + } + replayAll(); + } + + private void setupJaasConfig(String loginModule, String credentialFilePath, boolean includeFileOptions) throws IOException { + File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf"); + jaasConfigFile.deleteOnExit(); + previousJaasConfig = System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath()); + + List lines; + lines = new ArrayList<>(); + lines.add(loginModule + " { org.apache.kafka.connect.rest.basic.auth.extenstion.PropertyFileLoginModule required "); + if (includeFileOptions) { + lines.add("file=\"" + credentialFilePath + "\""); + } + lines.add(";};"); + Files.write(jaasConfigFile.toPath(), lines, StandardCharsets.UTF_8); + previousConfiguration = Configuration.getConfiguration(); + Configuration.setConfiguration(null); + } + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index c9c32e7..3c76d0f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -190,6 +190,13 @@ public class WorkerConfig extends AbstractConfig { + "Examples: plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins," + "/opt/connectors"; + public static final String REST_EXTENSION_CLASSES_CONFIG = "rest.extension.classes"; + protected static final String REST_EXTENSION_CLASSES_DOC = + "Comma-separated names of ConnectRestExtension classes, loaded and called " + + "in the order specified. Implementing the interface " + + "ConnectRestExtension allows you to inject into Connect's REST API user defined resources like filters. " + + "Typically used to add custom capability like logging, security, etc."; + public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG; public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG; public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG; @@ -254,7 +261,9 @@ public class WorkerConfig extends AbstractConfig { ConfigDef.Type.STRING, "none", ConfigDef.Importance.LOW, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC) .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, - Importance.LOW, HEADER_CONVERTER_CLASS_DOC); + Importance.LOW, HEADER_CONVERTER_CLASS_DOC) + .define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "", + Importance.LOW, REST_EXTENSION_CLASSES_DOC); } private void logInternalConverterDeprecationWarnings(Map props) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java new file mode 100644 index 0000000..a0f7fde --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.health; + +import org.apache.kafka.connect.health.ConnectClusterState; +import org.apache.kafka.connect.health.ConnectorHealth; +import org.apache.kafka.connect.health.ConnectorState; +import org.apache.kafka.connect.health.ConnectorType; +import org.apache.kafka.connect.health.TaskState; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.util.Callback; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ConnectClusterStateImpl implements ConnectClusterState { + + private Herder herder; + + public ConnectClusterStateImpl(Herder herder) { + this.herder = herder; + } + + @Override + public Collection connectors() { + final Collection connectors = new ArrayList<>(); + herder.connectors(new Callback>() { + @Override + public void onCompletion(Throwable error, Collection result) { + connectors.addAll(result); + } + }); + return connectors; + } + + @Override + public ConnectorHealth connectorHealth(String connName) { + + ConnectorStateInfo state = herder.connectorStatus(connName); + ConnectorState connectorState = new ConnectorState( + state.connector().state(), + state.connector().workerId(), + state.connector().trace() + ); + Map taskStates = taskStates(state.tasks()); + ConnectorHealth connectorHealth = new ConnectorHealth( + connName, + connectorState, + taskStates, + ConnectorType.valueOf(state.type().name()) + ); + return connectorHealth; + } + + private Map taskStates(List states) { + + Map taskStates = new HashMap<>(); + + for (ConnectorStateInfo.TaskState state : states) { + taskStates.put( + state.id(), + new TaskState(state.id(), state.workerId(), state.state(), state.trace()) + ); + } + return taskStates; + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java index c67dfb5..b56bd1a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.connect.runtime.isolation; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.transforms.Transformation; @@ -64,6 +66,7 @@ public class DelegatingClassLoader extends URLClassLoader { private final SortedSet> converters; private final SortedSet> headerConverters; private final SortedSet> transformations; + private final SortedSet> restExtensions; private final List pluginPaths; private final Map activePaths; @@ -77,6 +80,7 @@ public class DelegatingClassLoader extends URLClassLoader { this.converters = new TreeSet<>(); this.headerConverters = new TreeSet<>(); this.transformations = new TreeSet<>(); + this.restExtensions = new TreeSet<>(); } public DelegatingClassLoader(List pluginPaths) { @@ -99,6 +103,10 @@ public class DelegatingClassLoader extends URLClassLoader { return transformations; } + public Set> restExtensions() { + return restExtensions; + } + public ClassLoader connectorLoader(Connector connector) { return connectorLoader(connector.getClass().getName()); } @@ -228,6 +236,8 @@ public class DelegatingClassLoader extends URLClassLoader { headerConverters.addAll(plugins.headerConverters()); addPlugins(plugins.transformations(), loader); transformations.addAll(plugins.transformations()); + addPlugins(plugins.restExtensions(), loader); + restExtensions.addAll(plugins.restExtensions()); } loadJdbcDrivers(loader); @@ -281,7 +291,8 @@ public class DelegatingClassLoader extends URLClassLoader { getPluginDesc(reflections, Connector.class, loader), getPluginDesc(reflections, Converter.class, loader), getPluginDesc(reflections, HeaderConverter.class, loader), - getPluginDesc(reflections, Transformation.class, loader) + getPluginDesc(reflections, Transformation.class, loader), + getServiceLoaderPluginDesc(ConnectRestExtension.class, loader) ); } @@ -295,23 +306,29 @@ public class DelegatingClassLoader extends URLClassLoader { Collection> result = new ArrayList<>(); for (Class plugin : plugins) { if (PluginUtils.isConcrete(plugin)) { - // Temporary workaround until all the plugins are versioned. - if (Connector.class.isAssignableFrom(plugin)) { - result.add( - new PluginDesc<>( - plugin, - ((Connector) plugin.newInstance()).version(), - loader - ) - ); - } else { - result.add(new PluginDesc<>(plugin, "undefined", loader)); - } + result.add(new PluginDesc<>(plugin, versionFor(plugin.newInstance()), loader)); + } else { + log.debug("Skipping {} as it is not concrete implementation", plugin); } } return result; } + private Collection> getServiceLoaderPluginDesc(Class klass, + ClassLoader loader) { + + ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); + Collection> result = new ArrayList<>(); + for (T impl : serviceLoader) { + result.add(new PluginDesc<>(klass, versionFor(impl), loader)); + } + return result; + } + + private static String versionFor(T pluginImpl) { + return pluginImpl instanceof Versioned ? ((Versioned) pluginImpl).version() : "undefined"; + } + @Override protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { if (!PluginUtils.shouldLoadInIsolation(name)) { @@ -337,6 +354,7 @@ public class DelegatingClassLoader extends URLClassLoader { addAliases(converters); addAliases(headerConverters); addAliases(transformations); + addAliases(restExtensions); } private void addAliases(Collection> plugins) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java index c680f08..6f48e56 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.runtime.isolation; import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.transforms.Transformation; @@ -28,17 +29,20 @@ public class PluginScanResult { private final Collection> converters; private final Collection> headerConverters; private final Collection> transformations; + private final Collection> restExtensions; public PluginScanResult( Collection> connectors, Collection> converters, Collection> headerConverters, - Collection> transformations + Collection> transformations, + Collection> restExtensions ) { this.connectors = connectors; this.converters = converters; this.headerConverters = headerConverters; this.transformations = transformations; + this.restExtensions = restExtensions; } public Collection> connectors() { @@ -57,10 +61,15 @@ public class PluginScanResult { return transformations; } + public Collection> restExtensions() { + return restExtensions; + } + public boolean isEmpty() { return connectors().isEmpty() && converters().isEmpty() && headerConverters().isEmpty() - && transformations().isEmpty(); + && transformations().isEmpty() + && restExtensions().isEmpty(); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java index 5649213..918f9d7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.runtime.isolation; import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.storage.Converter; @@ -30,6 +31,7 @@ public enum PluginType { CONNECTOR(Connector.class), CONVERTER(Converter.class), TRANSFORMATION(Transformation.class), + REST_EXTENSION(ConnectRestExtension.class), UNKNOWN(Object.class); private Class klass; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index 30c41cd..9607410 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -16,9 +16,11 @@ */ package org.apache.kafka.connect.runtime.isolation; +import org.apache.kafka.common.Configurable; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.Task; @@ -316,6 +318,53 @@ public class Plugins { return plugin; } + + /** + * If the given class names are available in the classloader, return a list of new configured + * instances. If the instances implement {@link Configurable}, they are configured with provided {@param config} + * + * @param klassNames the list of class names of plugins that needs to instantiated and configured + * @param config the configuration containing the {@link org.apache.kafka.connect.runtime.Worker}'s configuration; may not be {@code null} + * @param pluginKlass the type of the plugin class that is being instantiated + * @return the instantiated and configured list of plugins of type ; empty list if the {@param klassNames} is {@code null} or empty + * @throws ConnectException if the implementation class could not be found + */ + public List newPlugins(List klassNames, AbstractConfig config, Class pluginKlass) { + List plugins = new ArrayList<>(); + if (klassNames != null) { + for (String klassName : klassNames) { + plugins.add(newPlugin(klassName, config, pluginKlass)); + } + } + return plugins; + } + + public T newPlugin(String klassName, AbstractConfig config, Class pluginKlass) { + T plugin; + Class klass; + try { + klass = pluginClass(delegatingLoader, klassName, pluginKlass); + } catch (ClassNotFoundException e) { + String msg = String.format("Failed to find any class that implements %s and which " + + "name matches %s", pluginKlass, klassName); + throw new ConnectException(msg); + } + plugin = newPlugin(klass); + if (plugin == null) { + throw new ConnectException("Unable to instantiate '" + klassName + "'"); + } + if (plugin instanceof Versioned) { + Versioned versionedPlugin = (Versioned) plugin; + if (versionedPlugin.version() == null || versionedPlugin.version().trim().isEmpty()) { + throw new ConnectException("Version not defined for '" + klassName + "'"); + } + } + if (plugin instanceof Configurable) { + ((Configurable) plugin).configure(config.originals()); + } + return plugin; + } + /** * Get an instance of the give class specified by the given configuration key. * diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java new file mode 100644 index 0000000..c9c2c3b --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.rest; + +import org.glassfish.jersey.server.ResourceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Objects; + +import javax.ws.rs.core.Configurable; +import javax.ws.rs.core.Configuration; + +/** + * The implementation delegates to {@link ResourceConfig} so that we can handle duplicate + * registrations deterministically by not re-registering them again. + */ +public class ConnectRestConfigurable implements Configurable { + + private static final Logger log = LoggerFactory.getLogger(ConnectRestConfigurable.class); + + private static final boolean ALLOWED_TO_REGISTER = true; + private static final boolean NOT_ALLOWED_TO_REGISTER = false; + + private ResourceConfig resourceConfig; + + public ConnectRestConfigurable(ResourceConfig resourceConfig) { + Objects.requireNonNull(resourceConfig, "ResourceConfig can't be null"); + this.resourceConfig = resourceConfig; + } + + + @Override + public Configuration getConfiguration() { + return resourceConfig.getConfiguration(); + } + + @Override + public ResourceConfig property(String name, Object value) { + return resourceConfig.property(name, value); + } + + @Override + public ResourceConfig register(Object component) { + if (allowedToRegister(component)) { + resourceConfig.register(component); + } + return resourceConfig; + } + + @Override + public ResourceConfig register(Object component, int priority) { + if (allowedToRegister(component)) { + resourceConfig.register(component, priority); + } + return resourceConfig; + } + + @Override + public ResourceConfig register(Object component, Map contracts) { + if (allowedToRegister(component)) { + resourceConfig.register(component, contracts); + } + return resourceConfig; + } + + @Override + public ResourceConfig register(Object component, Class[] contracts) { + if (allowedToRegister(component)) { + resourceConfig.register(component, contracts); + } + return resourceConfig; + } + + @Override + public ResourceConfig register(Class componentClass, Map contracts) { + if (allowedToRegister(componentClass)) { + resourceConfig.register(componentClass, contracts); + } + return resourceConfig; + } + + @Override + public ResourceConfig register(Class componentClass, Class[] contracts) { + if (allowedToRegister(componentClass)) { + resourceConfig.register(componentClass, contracts); + } + return resourceConfig; + } + + @Override + public ResourceConfig register(Class componentClass, int priority) { + if (allowedToRegister(componentClass)) { + resourceConfig.register(componentClass, priority); + } + return resourceConfig; + } + + @Override + public ResourceConfig register(Class componentClass) { + if (allowedToRegister(componentClass)) { + resourceConfig.register(componentClass); + } + return resourceConfig; + } + + private boolean allowedToRegister(Object component) { + if (resourceConfig.isRegistered(component)) { + log.warn("The resource {} is already registered", component); + return NOT_ALLOWED_TO_REGISTER; + } + return ALLOWED_TO_REGISTER; + } + + private boolean allowedToRegister(Class componentClass) { + if (resourceConfig.isRegistered(componentClass)) { + log.warn("The resource {} is already registered", componentClass); + return NOT_ALLOWED_TO_REGISTER; + } + return ALLOWED_TO_REGISTER; + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java new file mode 100644 index 0000000..cdf282f --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.rest; + +import org.apache.kafka.connect.health.ConnectClusterState; +import org.apache.kafka.connect.rest.ConnectRestExtensionContext; + +import javax.ws.rs.core.Configurable; + +public class ConnectRestExtensionContextImpl implements ConnectRestExtensionContext { + + private Configurable configurable; + private ConnectClusterState clusterState; + + public ConnectRestExtensionContextImpl( + Configurable configurable, + ConnectClusterState clusterState + ) { + this.configurable = configurable; + this.clusterState = clusterState; + } + + @Override + public Configurable configurable() { + return configurable; + } + + @Override + public ConnectClusterState clusterState() { + return clusterState; + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java index 8d7803e..5a589db 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java @@ -17,10 +17,14 @@ package org.apache.kafka.connect.runtime.rest; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; + import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.rest.ConnectRestExtension; +import org.apache.kafka.connect.rest.ConnectRestExtensionContext; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl; import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper; import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource; import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource; @@ -45,8 +49,7 @@ import org.glassfish.jersey.servlet.ServletContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.servlet.DispatcherType; -import javax.ws.rs.core.UriBuilder; +import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Collections; @@ -56,6 +59,9 @@ import java.util.Locale; import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.servlet.DispatcherType; +import javax.ws.rs.core.UriBuilder; + /** * Embedded server for the REST API that provides the control plane for Kafka Connect workers. */ @@ -71,6 +77,8 @@ public class RestServer { private final WorkerConfig config; private Server jettyServer; + private List connectRestExtensions = Collections.EMPTY_LIST; + /** * Create a REST server for this herder using the specified configs. */ @@ -163,6 +171,8 @@ public class RestServer { resourceConfig.register(ConnectExceptionMapper.class); + registerRestExtensions(herder, resourceConfig); + ServletContainer servletContainer = new ServletContainer(resourceConfig); ServletHolder servletHolder = new ServletHolder(servletContainer); @@ -207,10 +217,19 @@ public class RestServer { log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl()); } + + public void stop() { log.info("Stopping REST server"); try { + for (ConnectRestExtension connectRestExtension : connectRestExtensions) { + try { + connectRestExtension.close(); + } catch (IOException e) { + log.warn("Error while invoking close on " + connectRestExtension.getClass(), e); + } + } jettyServer.stop(); jettyServer.join(); } catch (Exception e) { @@ -280,6 +299,22 @@ public class RestServer { return null; } + void registerRestExtensions(Herder herder, ResourceConfig resourceConfig) { + connectRestExtensions = herder.plugins().newPlugins( + config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG), + config, ConnectRestExtension.class); + + ConnectRestExtensionContext connectRestExtensionContext = + new ConnectRestExtensionContextImpl( + new ConnectRestConfigurable(resourceConfig), + new ConnectClusterStateImpl(herder) + ); + for (ConnectRestExtension connectRestExtension : connectRestExtensions) { + connectRestExtension.register(connectRestExtensionContext); + } + + } + public static String urlJoin(String base, String path) { if (base.endsWith("/") && path.startsWith("/")) return base + path.substring(1); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 25c2cb1..ca26e4e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -71,7 +71,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -@PowerMockIgnore("javax.management.*") +@PowerMockIgnore({"javax.management.*", + "org.apache.kafka.connect.runtime.isolation.*"}) @RunWith(PowerMockRunner.class) public class WorkerSourceTaskTest extends ThreadedTest { private static final String TOPIC = "topic"; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java index 877fe6b..5c8aa29 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java @@ -24,6 +24,8 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.rest.ConnectRestExtension; +import org.apache.kafka.connect.rest.ConnectRestExtensionContext; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage; import org.apache.kafka.connect.storage.Converter; @@ -37,6 +39,7 @@ import org.junit.Test; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -142,6 +145,26 @@ public class PluginsTest { } @Test + public void shouldInstantiateAndConfigureConnectRestExtension() { + props.put(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG, + TestConnectRestExtension.class.getName()); + createConfig(); + + List connectRestExtensions = + plugins.newPlugins(config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG), + config, + ConnectRestExtension.class); + assertNotNull(connectRestExtensions); + assertEquals("One Rest Extension expected", 1, connectRestExtensions.size()); + assertNotNull(connectRestExtensions.get(0)); + assertTrue("Should be instance of TestConnectRestExtension", + connectRestExtensions.get(0) instanceof TestConnectRestExtension); + assertNotNull(((TestConnectRestExtension) connectRestExtensions.get(0)).configs); + assertEquals(config.originals(), + ((TestConnectRestExtension) connectRestExtensions.get(0)).configs); + } + + @Test public void shouldInstantiateAndConfigureDefaultHeaderConverter() { props.remove(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG); createConfig(); @@ -243,6 +266,30 @@ public class PluginsTest { } } + + public static class TestConnectRestExtension implements ConnectRestExtension { + + public Map configs; + + @Override + public void register(ConnectRestExtensionContext restPluginContext) { + } + + @Override + public void close() throws IOException { + } + + @Override + public void configure(Map configs) { + this.configs = configs; + } + + @Override + public String version() { + return "test"; + } + } + public static class TestInternalConverter extends JsonConverter { public Map configs; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java index d26aa04..2f8704a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java @@ -17,9 +17,11 @@ package org.apache.kafka.connect.runtime.rest; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.util.Callback; import org.easymock.Capture; import org.easymock.EasyMock; @@ -32,18 +34,20 @@ import org.powermock.api.easymock.PowerMock; import org.powermock.api.easymock.annotation.MockStrict; import org.powermock.modules.junit4.PowerMockRunner; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.Invocation; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.Response; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; + import static org.junit.Assert.assertEquals; @RunWith(PowerMockRunner.class) @@ -51,6 +55,8 @@ public class RestServerTest { @MockStrict private Herder herder; + @MockStrict + private Plugins plugins; private RestServer server; @After @@ -151,8 +157,19 @@ public class RestServerTest { public void checkCORSRequest(String corsDomain, String origin, String expectedHeader, String method) { // To be able to set the Origin, we need to toggle this flag + + Map workerProps = baseWorkerProps(); + workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain); + workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method); + WorkerConfig workerConfig = new DistributedConfig(workerProps); System.setProperty("sun.net.http.allowRestrictedHeaders", "true"); + EasyMock.expect(herder.plugins()).andStubReturn(plugins); + EasyMock.expect(plugins.newPlugins(Collections.EMPTY_LIST, + workerConfig, + ConnectRestExtension.class)) + .andStubReturn(Collections.EMPTY_LIST); + final Capture>> connectorsCallback = EasyMock.newCapture(); herder.connectors(EasyMock.capture(connectorsCallback)); PowerMock.expectLastCall().andAnswer(new IAnswer() { @@ -165,10 +182,7 @@ public class RestServerTest { PowerMock.replayAll(); - Map workerProps = baseWorkerProps(); - workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain); - workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method); - WorkerConfig workerConfig = new DistributedConfig(workerProps); + server = new RestServer(workerConfig); server.start(herder); diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension new file mode 100644 index 0000000..0a1ef88 --- /dev/null +++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.runtime.isolation.PluginsTest$TestConnectRestExtension \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 7082ddd..bbcdc31 100644 --- a/settings.gradle +++ b/settings.gradle @@ -17,4 +17,4 @@ include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:streams-scal 'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102', 'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11', 'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', - 'jmh-benchmarks' + 'connect:basic-auth-extension', 'jmh-benchmarks' -- To stop receiving notification emails like this one, please contact ewencp@apache.org.