kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-6253: Improve sink connector topic regex validation
Date Mon, 05 Feb 2018 17:46:26 GMT
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new dbd447f  KAFKA-6253: Improve sink connector topic regex validation
dbd447f is described below

commit dbd447f487622f6b501bf691cce140f3bccbc946
Author: Jeff Klukas <jeff@klukas.net>
AuthorDate: Mon Feb 5 09:46:07 2018 -0800

    KAFKA-6253: Improve sink connector topic regex validation
    
    KAFKA-3073 added topic regex support for sink connectors. The addition requires that you
only specify one of topics or topics.regex settings. This is being validated in one place,
but not during submission of connectors. This PR adds validation at `AbstractHerder.validateConnectorConfig`
and `WorkerConnector.initialize`.
    
    This adds a test of the new behavior to `AbstractHerderTest`.
    
    Author: Jeff Klukas <jeff@klukas.net>
    
    Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
    
    Closes #4251 from jklukas/connect-topics-validation
    
    (cherry picked from commit eb3fef760e1c876b936f175e0eb9a1446cf5bdcf)
    Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
---
 .../kafka/connect/runtime/AbstractHerder.java      | 10 +++--
 .../kafka/connect/runtime/SinkConnectorConfig.java | 33 ++++++++++++++-
 .../kafka/connect/runtime/WorkerConnector.java     |  3 ++
 .../kafka/connect/runtime/WorkerSinkTask.java      | 22 ++--------
 .../kafka/connect/runtime/AbstractHerderTest.java  | 16 ++++++++
 .../runtime/distributed/DistributedHerderTest.java |  8 ++--
 .../runtime/standalone/StandaloneHerderTest.java   | 47 ++++++++++++----------
 7 files changed, 92 insertions(+), 47 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 02465c9..b913f9e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -256,9 +256,13 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener,
Con
         Connector connector = getConnector(connType);
         ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector);
         try {
-            ConfigDef baseConfigDef = (connector instanceof SourceConnector)
-                    ? SourceConnectorConfig.configDef()
-                    : SinkConnectorConfig.configDef();
+            ConfigDef baseConfigDef;
+            if (connector instanceof SourceConnector) {
+                baseConfigDef = SourceConnectorConfig.configDef();
+            } else {
+                baseConfigDef = SinkConnectorConfig.configDef();
+                SinkConnectorConfig.validate(connectorProps);
+            }
             ConfigDef enrichedConfigDef = ConnectorConfig.enrich(plugins(), baseConfigDef,
connectorProps, false);
             Map<String, ConfigValue> validatedConnectorConfig = validateBasicConnectorConfig(
                     connector,
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index cf5564c..887a4da 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.transforms.util.RegexValidator;
@@ -34,7 +35,7 @@ public class SinkConnectorConfig extends ConnectorConfig {
     public static final String TOPICS_DEFAULT = "";
     private static final String TOPICS_DISPLAY = "Topics";
 
-    private static final String TOPICS_REGEX_CONFIG = SinkTask.TOPICS_REGEX_CONFIG;
+    public static final String TOPICS_REGEX_CONFIG = SinkTask.TOPICS_REGEX_CONFIG;
     private static final String TOPICS_REGEX_DOC = "Regular expression giving topics to consume.
" +
         "Under the hood, the regex is compiled to a <code>java.util.regex.Pattern</code>.
" +
         "Only one of " + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG + " should be specified.";
@@ -52,4 +53,34 @@ public class SinkConnectorConfig extends ConnectorConfig {
     public SinkConnectorConfig(Plugins plugins, Map<String, String> props) {
         super(plugins, config, props);
     }
+
+    /**
+     * Throw an exception if the passed-in properties do not constitute a valid sink.
+     * @param props sink configuration properties
+     */
+    public static void validate(Map<String, String> props) {
+        final boolean hasTopicsConfig = hasTopicsConfig(props);
+        final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props);
+
+        if (hasTopicsConfig && hasTopicsRegexConfig) {
+            throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + SinkTask.TOPICS_REGEX_CONFIG
+
+                " are mutually exclusive options, but both are set.");
+        }
+
+        if (!hasTopicsConfig && !hasTopicsRegexConfig) {
+            throw new ConfigException("Must configure one of " +
+                SinkTask.TOPICS_CONFIG + " or " + SinkTask.TOPICS_REGEX_CONFIG);
+        }
+    }
+
+    public static boolean hasTopicsConfig(Map<String, String> props) {
+        String topicsStr = props.get(TOPICS_CONFIG);
+        return topicsStr != null && !topicsStr.trim().isEmpty();
+    }
+
+    public static boolean hasTopicsRegexConfig(Map<String, String> props) {
+        String topicsRegexStr = props.get(TOPICS_REGEX_CONFIG);
+        return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
+    }
+
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 9b934f3..611e196 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -77,6 +77,9 @@ public class WorkerConnector {
         try {
             this.config = connectorConfig.originalsStrings();
             log.debug("{} Initializing connector {} with config {}", this, connName, config);
+            if (isSinkConnector()) {
+                SinkConnectorConfig.validate(config);
+            }
 
             connector.initialize(new ConnectorContext() {
                 @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 85695bb..5aeb851 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -265,27 +264,14 @@ class WorkerSinkTask extends WorkerTask {
      * Initializes and starts the SinkTask.
      */
     protected void initializeAndStart() {
-        String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG);
-        boolean topicsStrPresent = topicsStr != null && !topicsStr.trim().isEmpty();
+        SinkConnectorConfig.validate(taskConfig);
 
-        String topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);
-        boolean topicsRegexStrPresent = topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
-
-        if (topicsStrPresent && topicsRegexStrPresent) {
-            throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + SinkTask.TOPICS_REGEX_CONFIG
+
-                " are mutually exclusive options, but both are set.");
-        }
-
-        if (!topicsStrPresent && !topicsRegexStrPresent) {
-            throw new ConfigException("Must configure one of " +
-                SinkTask.TOPICS_CONFIG + " or " + SinkTask.TOPICS_REGEX_CONFIG);
-        }
-
-        if (topicsStrPresent) {
-            String[] topics = topicsStr.split(",");
+        if (SinkConnectorConfig.hasTopicsConfig(taskConfig)) {
+            String[] topics = taskConfig.get(SinkTask.TOPICS_CONFIG).split(",");
             consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
             log.debug("{} Initializing and starting task for topics {}", this, topics);
         } else {
+            String topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);
             Pattern pattern = Pattern.compile(topicsRegexStr);
             consumer.subscribe(pattern, new HandleRebalance());
             log.debug("{} Initializing and starting task for topics regex {}", this, topicsRegexStr);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index dac1392..0718eb1 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
@@ -183,6 +184,21 @@ public class AbstractHerderTest {
         verifyAll();
     }
 
+    @Test(expected = ConfigException.class)
+    public void testConfigValidationInvalidTopics() {
+        AbstractHerder herder = createConfigValidationHerder(TestSinkConnector.class);
+        replayAll();
+
+        Map<String, String> config = new HashMap();
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSinkConnector.class.getName());
+        config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1,topic2");
+        config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*");
+
+        herder.validateConnectorConfig(config);
+
+        verifyAll();
+    }
+
     @Test()
     public void testConfigValidationTransformsExtendResults() {
         AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index d7307cf..d7a7d87 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -355,7 +355,7 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
 
         // config validation
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -398,7 +398,7 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
 
         // config validation
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -443,7 +443,7 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
 
         // config validation
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -1338,7 +1338,7 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
 
         // config validation
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 79be45b..fd330f2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -125,7 +125,8 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(config);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, config);
 
         PowerMock.replayAll();
 
@@ -142,7 +143,7 @@ public class StandaloneHerderTest {
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
         config.remove(ConnectorConfig.NAME_CONFIG);
 
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -167,7 +168,7 @@ public class StandaloneHerderTest {
     public void testCreateConnectorFailedCustomValidation() throws Exception {
         connector = PowerMock.createMock(BogusSourceConnector.class);
 
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@@ -199,7 +200,7 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config, config);
 
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(2);
@@ -224,7 +225,8 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SINK);
 
         Map<String, String> config = connectorConfig(SourceSink.SINK);
-        expectConfigValidation(config);
+        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+        expectConfigValidation(connectorMock, true, config);
         PowerMock.replayAll();
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback);
@@ -238,7 +240,8 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(config);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, config);
 
         EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.<TaskStatus>emptyList());
         statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, AbstractStatus.State.DESTROYED,
WORKER_ID, 0));
@@ -270,7 +273,8 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(config);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, config);
 
         worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall().andReturn(true);
@@ -295,7 +299,8 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(config);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, config);
 
         worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall().andReturn(true);
@@ -326,7 +331,8 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(connectorConfig);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, connectorConfig);
 
         worker.stopAndAwaitTask(taskId);
         EasyMock.expectLastCall();
@@ -351,7 +357,8 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(connectorConfig);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, connectorConfig);
 
         worker.stopAndAwaitTask(taskId);
         EasyMock.expectLastCall();
@@ -381,7 +388,8 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(connectorConfig);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, connectorConfig);
 
         // herder.stop() should stop any running connectors and tasks even if destroyConnector
was not invoked
         expectStop();
@@ -402,6 +410,7 @@ public class StandaloneHerderTest {
     @Test
     public void testAccessors() throws Exception {
         Map<String, String> connConfig = connectorConfig(SourceSink.SOURCE);
+        System.out.println(connConfig);
 
         Callback<Collection<String>> listConnectorsCb = PowerMock.createMock(Callback.class);
         Callback<ConnectorInfo> connectorInfoCb = PowerMock.createMock(Callback.class);
@@ -421,7 +430,8 @@ public class StandaloneHerderTest {
         // Create connector
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
-        expectConfigValidation(connConfig);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connector, true, connConfig);
 
         // Validate accessors with 1 connector
         listConnectorsCb.onCompletion(null, singleton(CONNECTOR_NAME));
@@ -467,7 +477,7 @@ public class StandaloneHerderTest {
         // Create
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, connConfig);
 
         // Should get first config
@@ -526,7 +536,8 @@ public class StandaloneHerderTest {
         Map<String, String> config = new HashMap<>();
         config.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
         config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSinkConnector.class.getName());
-        Connector connectorMock = PowerMock.createMock(Connector.class);
+        config.put(SinkConnectorConfig.TOPICS_CONFIG, TOPICS_LIST_STR);
+        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
         String error = "This is an error in your config!";
         List<String> errors = new ArrayList<>(singletonList(error));
         String key = "foo.invalid.key";
@@ -592,7 +603,7 @@ public class StandaloneHerderTest {
         EasyMock.expect(herder.connectorTypeForClass(BogusSourceConnector.class.getName()))
             .andReturn(ConnectorType.SOURCE).anyTimes();
         EasyMock.expect(herder.connectorTypeForClass(BogusSinkConnector.class.getName()))
-        .andReturn(ConnectorType.SINK).anyTimes();
+            .andReturn(ConnectorType.SINK).anyTimes();
         worker.isSinkConnector(CONNECTOR_NAME);
         PowerMock.expectLastCall().andReturn(sourceSink == SourceSink.SINK);
     }
@@ -631,12 +642,6 @@ public class StandaloneHerderTest {
         return generatedTaskProps;
     }
 
-
-    private void expectConfigValidation(Map<String, String> ... configs) {
-        Connector connectorMock = PowerMock.createMock(Connector.class);
-        expectConfigValidation(connectorMock, true, configs);
-    }
-
     private void expectConfigValidation(
             Connector connectorMock,
             boolean shouldCreateConnector,

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.

Mime
View raw message