kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-5412: Using connect-console-sink/source.properties raises an exception related to "file" property not found
Date Mon, 19 Jun 2017 16:23:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b836bd18f -> 198a43d84


KAFKA-5412: Using connect-console-sink/source.properties raises an exception related to "file"
property not found

Author: ppatierno <ppatierno@live.com>

Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #3279 from ppatierno/kafka-5412


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/198a43d8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/198a43d8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/198a43d8

Branch: refs/heads/trunk
Commit: 198a43d84693153c0f0177b8314117525c05047b
Parents: b836bd1
Author: ppatierno <ppatierno@live.com>
Authored: Mon Jun 19 09:22:19 2017 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon Jun 19 09:22:19 2017 -0700

----------------------------------------------------------------------
 .../connect/file/FileStreamSinkConnector.java    |  2 +-
 .../connect/file/FileStreamSourceConnector.java  |  2 +-
 .../file/FileStreamSinkConnectorTest.java        | 14 ++++++++++++++
 .../connect/file/FileStreamSourceTaskTest.java   | 19 +++++++++++++++++++
 4 files changed, 35 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/198a43d8/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
index 449b9b1..4ae7f4b 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
@@ -36,7 +36,7 @@ public class FileStreamSinkConnector extends SinkConnector {
 
     public static final String FILE_CONFIG = "file";
     private static final ConfigDef CONFIG_DEF = new ConfigDef()
-        .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Destination filename.");
+        .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Destination filename. If
not specified, the standard output will be used");
 
     private String filename;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/198a43d8/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
index bf79b8a..335fe92 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
@@ -38,7 +38,7 @@ public class FileStreamSourceConnector extends SourceConnector {
     public static final String FILE_CONFIG = "file";
 
     private static final ConfigDef CONFIG_DEF = new ConfigDef()
-        .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
+        .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source filename. If not
specified, the standard input will be used")
         .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");
 
     private String filename;

http://git-wip-us.apache.org/repos/asf/kafka/blob/198a43d8/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
----------------------------------------------------------------------
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
index 660a44c..aead7ef 100644
--- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 public class FileStreamSinkConnectorTest {
 
@@ -74,6 +75,19 @@ public class FileStreamSinkConnectorTest {
     }
 
     @Test
+    public void testSinkTasksStdout() {
+        PowerMock.replayAll();
+
+        sinkProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+        connector.start(sinkProperties);
+        List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
+        assertEquals(1, taskConfigs.size());
+        assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testTaskClass() {
         PowerMock.replayAll();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/198a43d8/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
index e8637f2..eb91dbd 100644
--- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
@@ -26,6 +26,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.powermock.api.easymock.PowerMock;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -133,6 +134,24 @@ public class FileStreamSourceTaskTest {
         task.start(config);
     }
 
+    @Test
+    public void testMissingFile() throws InterruptedException {
+        replay();
+
+        String data = "line\n";
+        System.setIn(new ByteArrayInputStream(data.getBytes()));
+
+        config.remove(FileStreamSourceConnector.FILE_CONFIG);
+        task.start(config);
+
+        List<SourceRecord> records = task.poll();
+        assertEquals(1, records.size());
+        assertEquals(TOPIC, records.get(0).topic());
+        assertEquals("line", records.get(0).value());
+
+        task.stop();
+    }
+
     public void testInvalidFile() throws InterruptedException {
         config.put(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename");
         task.start(config);


Mime
View raw message