kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [1/2] kafka git commit: KAFKA-3315: Add REST and Connector API to expose connector configuration
Date Thu, 17 Mar 2016 20:26:17 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a1eb12d7c -> c07d01722


http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java
new file mode 100644
index 0000000..6040563
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public class ConfigInfo {
+
+    private ConfigKeyInfo configKey;
+    private ConfigValueInfo configValue;
+
+    @JsonCreator
+    public ConfigInfo(
+        @JsonProperty("definition") ConfigKeyInfo configKey,
+        @JsonProperty("value") ConfigValueInfo configValue) {
+        this.configKey = configKey;
+        this.configValue = configValue;
+    }
+
+    @JsonProperty("definition")
+    public ConfigKeyInfo configKey() {
+        return configKey;
+    }
+
+    @JsonProperty("value")
+    public ConfigValueInfo configValue() {
+        return configValue;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ConfigInfo that = (ConfigInfo) o;
+        return Objects.equals(configKey, that.configKey) &&
+               Objects.equals(configValue, that.configValue);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(configKey, configValue);
+    }
+
+    @Override
+    public String toString() {
+        return "[" + configKey.toString() + "," + configValue.toString() + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java
new file mode 100644
index 0000000..3e73983
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+public class ConfigInfos {
+
+    @JsonProperty("name")
+    private final String name;
+
+    @JsonProperty("error_count")
+    private final int errorCount;
+
+    @JsonProperty("groups")
+    private final List<String> groups;
+
+    @JsonProperty("configs")
+    private final List<ConfigInfo> configs;
+
+    @JsonCreator
+    public ConfigInfos(@JsonProperty("name") String name,
+                       @JsonProperty("error_count") int errorCount,
+                       @JsonProperty("groups") List<String> groups,
+                       @JsonProperty("configs") List<ConfigInfo> configs) {
+        this.name = name;
+        this.groups = groups;
+        this.errorCount = errorCount;
+        this.configs = configs;
+    }
+
+    @JsonProperty
+    public String name() {
+        return name;
+    }
+
+    @JsonProperty
+    public List<String> groups() {
+        return groups;
+    }
+
+    @JsonProperty("error_count")
+    public int errorCount() {
+        return errorCount;
+    }
+
+    @JsonProperty("configs")
+    public List<ConfigInfo> values() {
+        return configs;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ConfigInfos that = (ConfigInfos) o;
+        return Objects.equals(name, that.name) &&
+               Objects.equals(errorCount, that.errorCount) &&
+               Objects.equals(groups, that.groups) &&
+               Objects.equals(configs, that.configs);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, errorCount, groups, configs);
+    }
+
+    @Override
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("[")
+            .append(name)
+            .append(",")
+            .append(errorCount)
+            .append(",")
+            .append(groups)
+            .append(",")
+            .append(configs)
+            .append("]");
+        return sb.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java
new file mode 100644
index 0000000..f813709
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+public class ConfigKeyInfo {
+
+    private final String name;
+    private final String type;
+    private final boolean required;
+    private final Object defaultValue;
+    private final String importance;
+    private final String documentation;
+    private final String group;
+    private final int orderInGroup;
+    private final String width;
+    private final String displayName;
+    private final List<String> dependents;
+
+    @JsonCreator
+    public ConfigKeyInfo(@JsonProperty("name") String name,
+                         @JsonProperty("type") String type,
+                         @JsonProperty("required") boolean required,
+                         @JsonProperty("default_value") Object defaultValue,
+                         @JsonProperty("importance") String importance,
+                         @JsonProperty("documentation") String documentation,
+                         @JsonProperty("group") String group,
+                         @JsonProperty("order_in_group") int orderInGroup,
+                         @JsonProperty("width") String width,
+                         @JsonProperty("display_name") String displayName,
+                         @JsonProperty("dependents") List<String> dependents) {
+        this.name = name;
+        this.type = type;
+        this.required = required;
+        this.defaultValue = defaultValue;
+        this.importance = importance;
+        this.documentation = documentation;
+        this.group = group;
+        this.orderInGroup = orderInGroup;
+        this.width = width;
+        this.displayName = displayName;
+        this.dependents = dependents;
+    }
+
+    @JsonProperty
+    public String name() {
+        return name;
+    }
+
+    @JsonProperty
+    public String type() {
+        return type;
+    }
+
+    @JsonProperty
+    public boolean required() {
+        return required;
+    }
+
+    @JsonProperty("default_value")
+    public Object defaultValue() {
+        return defaultValue;
+    }
+
+    @JsonProperty
+    public String documentation() {
+        return documentation;
+    }
+
+    @JsonProperty
+    public String group() {
+        return group;
+    }
+
+    @JsonProperty("order")
+    public int orderInGroup() {
+        return orderInGroup;
+    }
+
+    @JsonProperty
+    public String width() {
+        return width;
+    }
+
+    @JsonProperty
+    public String importance() {
+        return importance;
+    }
+
+    @JsonProperty("display_name")
+    public String displayName() {
+        return displayName;
+    }
+
+    @JsonProperty
+    public List<String> dependents() {
+        return dependents;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ConfigKeyInfo that = (ConfigKeyInfo) o;
+        return Objects.equals(name, that.name) &&
+               Objects.equals(type, that.type) &&
+               Objects.equals(required, that.required) &&
+               Objects.equals(defaultValue, that.defaultValue) &&
+               Objects.equals(importance, that.importance) &&
+               Objects.equals(documentation, that.documentation) &&
+               Objects.equals(group, that.group) &&
+               Objects.equals(orderInGroup, that.orderInGroup) &&
+               Objects.equals(width, that.width) &&
+               Objects.equals(displayName, that.displayName) &&
+               Objects.equals(dependents, that.dependents);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, type, required, defaultValue, importance, documentation,
group, orderInGroup, width, displayName, dependents);
+    }
+
+    @Override
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("[")
+            .append(name)
+            .append(",")
+            .append(type)
+            .append(",")
+            .append(required)
+            .append(",")
+            .append(defaultValue)
+            .append(",")
+            .append(importance)
+            .append(",")
+            .append(documentation)
+            .append(",")
+            .append(group)
+            .append(",")
+            .append(orderInGroup)
+            .append(",")
+            .append(width)
+            .append(",")
+            .append(displayName)
+            .append(",")
+            .append(dependents)
+            .append("]");
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java
new file mode 100644
index 0000000..51e7ee5
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+public class ConfigValueInfo {
+    private String name;
+    private Object value;
+    private List<Object> recommendedValues;
+    private List<String> errors;
+    private boolean visible;
+
+    @JsonCreator
+    public ConfigValueInfo(
+        @JsonProperty("name") String name,
+        @JsonProperty("value") Object value,
+        @JsonProperty("recommended_values") List<Object> recommendedValues,
+        @JsonProperty("errors") List<String> errors,
+        @JsonProperty("visible") boolean visible) {
+        this.name = name;
+        this.value = value;
+        this.recommendedValues = recommendedValues;
+        this.errors = errors;
+        this.visible = visible;
+    }
+
+    @JsonProperty
+    public String name() {
+        return name;
+    }
+
+    @JsonProperty
+    public Object value() {
+        return value;
+    }
+
+    @JsonProperty("recommended_values")
+    public List<Object> recommendedValues() {
+        return recommendedValues;
+    }
+
+    @JsonProperty
+    public List<String> errors() {
+        return errors;
+    }
+
+    @JsonProperty
+    public boolean visible() {
+        return visible;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ConfigValueInfo that = (ConfigValueInfo) o;
+        return Objects.equals(name, that.name) &&
+               Objects.equals(value, that.value) &&
+               Objects.equals(recommendedValues, that.recommendedValues) &&
+               Objects.equals(errors, that.errors) &&
+               Objects.equals(visible, that.visible);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, value, recommendedValues, errors, visible);
+    }
+
+    @Override
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("[")
+            .append(name)
+            .append(",")
+            .append(value)
+            .append(",")
+            .append(recommendedValues)
+            .append(",")
+            .append(errors)
+            .append(",")
+            .append(visible)
+            .append("]");
+        return sb.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
index 8daae05..9567ef9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.rest.entities;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.kafka.connect.util.ConnectorTaskId;
 
 import java.util.ArrayList;
@@ -34,13 +35,15 @@ public class ConnectorInfo {
     private final List<ConnectorTaskId> tasks;
 
     @JsonCreator
-    public ConnectorInfo(@JsonProperty("name") String name, @JsonProperty("config") Map<String,
String> config,
+    public ConnectorInfo(@JsonProperty("name") String name,
+                         @JsonProperty("config") Map<String, String> config,
                          @JsonProperty("tasks") List<ConnectorTaskId> tasks) {
         this.name = name;
         this.config = config;
         this.tasks = tasks;
     }
 
+
     @JsonProperty
     public String name() {
         return name;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
new file mode 100644
index 0000000..8439707
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
+
+import java.util.Map;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+@Path("/connector-plugins")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class ConnectorPluginsResource {
+
+    private final Herder herder;
+
+    public ConnectorPluginsResource(Herder herder) {
+        this.herder = herder;
+    }
+
+    @PUT
+    @Path("/{connectorType}/config/validate")
+    public ConfigInfos validateConfigs(final @PathParam("connectorType") String connType,
+                                       final Map<String, String> connectorConfig) throws
Throwable {
+        return herder.validateConfigs(connType, connectorConfig);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index d0d940b..b6e9f61 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.connect.runtime.rest.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
@@ -32,6 +33,14 @@ import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 import javax.servlet.ServletContext;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
@@ -43,13 +52,6 @@ import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import java.net.URI;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 @Path("/connectors")
 @Produces(MediaType.APPLICATION_JSON)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 707470f..9c48ed7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -49,7 +49,6 @@ import java.util.Set;
 public class StandaloneHerder extends AbstractHerder {
     private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class);
 
-    private final Worker worker;
     private HashMap<String, ConnectorState> connectors = new HashMap<>();
 
     public StandaloneHerder(Worker worker) {
@@ -60,8 +59,7 @@ public class StandaloneHerder extends AbstractHerder {
     StandaloneHerder(String workerId,
                      Worker worker,
                      StatusBackingStore statusBackingStore) {
-        super(statusBackingStore, workerId);
-        this.worker = worker;
+        super(worker, statusBackingStore, workerId);
     }
 
     public synchronized void start() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
index 0ab64fd..c2515a0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.connect.tools;
 
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
@@ -61,4 +62,9 @@ public class VerifiableSinkConnector extends SourceConnector {
     @Override
     public void stop() {
     }
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
index 5f9afd5..b18db6e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.connect.tools;
 
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
@@ -61,4 +62,9 @@ public class VerifiableSourceConnector extends SourceConnector {
     @Override
     public void stop() {
     }
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index f17023c..1dc5784 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -34,6 +34,7 @@ public class AbstractHerderTest extends EasyMockSupport {
 
     @Test
     public void connectorStatus() {
+        Worker worker = null;
         String workerId = "workerId";
         String connector = "connector";
         int generation = 5;
@@ -42,8 +43,8 @@ public class AbstractHerderTest extends EasyMockSupport {
         StatusBackingStore store = strictMock(StatusBackingStore.class);
 
         AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(StatusBackingStore.class, String.class)
-                .withArgs(store, workerId)
+                .withConstructor(Worker.class, StatusBackingStore.class, String.class)
+                .withArgs(worker, store, workerId)
                 .addMockedMethod("generation")
                 .createMock();
 
@@ -76,14 +77,15 @@ public class AbstractHerderTest extends EasyMockSupport {
 
     @Test
     public void taskStatus() {
+        Worker worker = null;
         ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
         String workerId = "workerId";
 
         StatusBackingStore store = strictMock(StatusBackingStore.class);
 
         AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(StatusBackingStore.class, String.class)
-                .withArgs(store, workerId)
+                .withConstructor(Worker.class, StatusBackingStore.class, String.class)
+                .withArgs(worker, store, workerId)
                 .addMockedMethod("generation")
                 .createMock();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 67d3fdc..557d789 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
@@ -466,7 +467,11 @@ public class WorkerTest extends ThreadedTest {
 
 
     /* Name here needs to be unique as we are testing the aliasing mechanism */
-    private static class WorkerTestConnector extends Connector {
+    public static class WorkerTestConnector extends Connector {
+
+        private static final ConfigDef CONFIG_DEF  = new ConfigDef()
+            .define("configName", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Test
configName.");
+
         @Override
         public String version() {
             return "1.0";
@@ -491,6 +496,11 @@ public class WorkerTest extends ThreadedTest {
         public void stop() {
 
         }
+
+        @Override
+        public ConfigDef config() {
+            return CONFIG_DEF;
+        }
     }
 
     private static class TestSourceTask extends SourceTask {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
new file mode 100644
index 0000000..625c91f
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import org.apache.kafka.common.config.Config;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.runtime.AbstractHerder;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(RestServer.class)
+@PowerMockIgnore("javax.management.*")
+public class ConnectorPluginsResourceTest {
+
+    private static Map<String, String> props = new HashMap<>();
+    static {
+        props.put("test.string.config", "testString");
+        props.put("test.int.config", "10");
+    }
+
+    private static final ConfigInfos CONFIG_INFOS;
+    static {
+        List<ConfigInfo> configs = new LinkedList<>();
+
+        ConfigKeyInfo configKeyInfo = new ConfigKeyInfo("test.string.config", "STRING", true,
"", "HIGH", "Test configuration for string type.", null, -1, "NONE", "test.string.config",
new LinkedList<String>());
+        ConfigValueInfo configValueInfo = new ConfigValueInfo("test.string.config", "testString",
Collections.<Object>emptyList(), Collections.<String>emptyList(), true);
+        ConfigInfo configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
+        configs.add(configInfo);
+
+        configKeyInfo = new ConfigKeyInfo("test.int.config", "INT", true, "", "MEDIUM", "Test
configuration for integer type.", null, -1, "NONE", "test.int.config", new LinkedList<String>());
+        configValueInfo = new ConfigValueInfo("test.int.config", 10, Collections.<Object>emptyList(),
Collections.<String>emptyList(), true);
+        configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
+        configs.add(configInfo);
+
+        configKeyInfo = new ConfigKeyInfo("test.string.config.default", "STRING", false,
"", "LOW", "Test configuration with default value.", null, -1, "NONE", "test.string.config.default",
new LinkedList<String>());
+        configValueInfo = new ConfigValueInfo("test.string.config.default", "", Collections.<Object>emptyList(),
Collections.<String>emptyList(), true);
+        configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
+        configs.add(configInfo);
+
+        CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(),
0, Collections.<String>emptyList(), configs);
+    }
+
+    @Mock
+    private Herder herder;
+    private ConnectorPluginsResource connectorPluginsResource;
+
+    @Before
+    public void setUp() throws NoSuchMethodException {
+        PowerMock.mockStatic(RestServer.class,
+                             RestServer.class.getMethod("httpRequest", String.class, String.class,
Object.class, TypeReference.class));
+        connectorPluginsResource = new ConnectorPluginsResource(herder);
+    }
+
+    @Test
+    public void testValidateConfig() throws Throwable {
+        herder.validateConfigs(EasyMock.eq(ConnectorPluginsResourceTestConnector.class.getName()),
EasyMock.eq(props));
+
+        PowerMock.expectLastCall().andAnswer(new IAnswer<ConfigInfos>() {
+            @Override
+            public ConfigInfos answer() {
+                Config config = new ConnectorPluginsResourceTestConnector().validate(props);
+                Connector connector = new ConnectorPluginsResourceTestConnector();
+                ConfigDef configDef = connector.config();
+                return AbstractHerder.generateResult(ConnectorPluginsResourceTestConnector.class.getName(),
configDef.configKeys(), config.configValues(), configDef.groups());
+            }
+        });
+        PowerMock.replayAll();
+
+        ConfigInfos configInfos = connectorPluginsResource.validateConfigs(ConnectorPluginsResourceTestConnector.class.getName(),
props);
+        assertEquals(CONFIG_INFOS.name(), configInfos.name());
+        assertEquals(CONFIG_INFOS.errorCount(), configInfos.errorCount());
+        assertEquals(CONFIG_INFOS.groups(), configInfos.groups());
+        assertEquals(new HashSet<>(CONFIG_INFOS.values()), new HashSet<>(configInfos.values()));
+
+        PowerMock.verifyAll();
+    }
+
+    /* Name here needs to be unique as we are testing the aliasing mechanism */
+    public static class ConnectorPluginsResourceTestConnector extends Connector {
+
+        public static final String TEST_STRING_CONFIG = "test.string.config";
+        public static final String TEST_INT_CONFIG = "test.int.config";
+        public static final String TEST_STRING_CONFIG_DEFAULT = "test.string.config.default";
+
+        private static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(TEST_STRING_CONFIG, Type.STRING, Importance.HIGH, "Test configuration
for string type.")
+            .define(TEST_INT_CONFIG, Type.INT, Importance.MEDIUM, "Test configuration for
integer type.")
+            .define(TEST_STRING_CONFIG_DEFAULT, Type.STRING, "", Importance.LOW, "Test configuration
with default value.");
+
+        @Override
+        public String version() {
+            return "1.0";
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+
+        }
+
+        @Override
+        public Class<? extends Task> taskClass() {
+            return null;
+        }
+
+        @Override
+        public List<Map<String, String>> taskConfigs(int maxTasks) {
+            return null;
+        }
+
+        @Override
+        public void stop() {
+
+        }
+
+        @Override
+        public ConfigDef config() {
+            return CONFIG_DEF;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 4659ae8..970f56c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.connect.runtime.rest.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
@@ -83,7 +84,6 @@ public class ConnectorsResourceTest {
         TASK_INFOS.add(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 1), TASK_CONFIGS.get(1)));
     }
 
-
     @Mock
     private Herder herder;
     private ConnectorsResource connectorsResource;
@@ -172,6 +172,8 @@ public class ConnectorsResourceTest {
         connectorsResource.createConnector(body);
 
         PowerMock.verifyAll();
+
+
     }
 
     @Test(expected = AlreadyExistsException.class)


Mime
View raw message