Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 46F48200CAC for ; Mon, 19 Jun 2017 18:23:49 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 054AE160BE1; Mon, 19 Jun 2017 16:23:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 255EE160BD5 for ; Mon, 19 Jun 2017 18:23:47 +0200 (CEST) Received: (qmail 62696 invoked by uid 500); 19 Jun 2017 16:23:47 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 62687 invoked by uid 99); 19 Jun 2017 16:23:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Jun 2017 16:23:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3362ADFAF5; Mon, 19 Jun 2017 16:23:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ewencp@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-5412: Using connect-console-sink/source.properties raises an exception related to "file" property not found Date: Mon, 19 Jun 2017 16:23:47 +0000 (UTC) archived-at: Mon, 19 Jun 2017 16:23:49 -0000 Repository: kafka Updated Branches: refs/heads/trunk b836bd18f -> 198a43d84 KAFKA-5412: Using connect-console-sink/source.properties raises an exception related to "file" property not found Author: ppatierno Reviewers: Randall Hauch , Ewen Cheslack-Postava Closes #3279 from ppatierno/kafka-5412 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/198a43d8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/198a43d8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/198a43d8 Branch: refs/heads/trunk Commit: 198a43d84693153c0f0177b8314117525c05047b Parents: b836bd1 Author: ppatierno Authored: Mon Jun 19 09:22:19 2017 -0700 Committer: Ewen Cheslack-Postava Committed: Mon Jun 19 09:22:19 2017 -0700 ---------------------------------------------------------------------- .../connect/file/FileStreamSinkConnector.java | 2 +- .../connect/file/FileStreamSourceConnector.java | 2 +- .../file/FileStreamSinkConnectorTest.java | 14 ++++++++++++++ .../connect/file/FileStreamSourceTaskTest.java | 19 +++++++++++++++++++ 4 files changed, 35 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/198a43d8/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java ---------------------------------------------------------------------- diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java index 449b9b1..4ae7f4b 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java @@ -36,7 +36,7 @@ public class FileStreamSinkConnector extends SinkConnector { public static final String FILE_CONFIG = "file"; private static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Destination filename."); + .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Destination filename. If not specified, the standard output will be used"); private String filename; http://git-wip-us.apache.org/repos/asf/kafka/blob/198a43d8/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java ---------------------------------------------------------------------- diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java index bf79b8a..335fe92 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java @@ -38,7 +38,7 @@ public class FileStreamSourceConnector extends SourceConnector { public static final String FILE_CONFIG = "file"; private static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.") + .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source filename. If not specified, the standard input will be used") .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to"); private String filename; http://git-wip-us.apache.org/repos/asf/kafka/blob/198a43d8/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java ---------------------------------------------------------------------- diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java index 660a44c..aead7ef 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; public class FileStreamSinkConnectorTest { @@ -74,6 +75,19 @@ public class FileStreamSinkConnectorTest { } @Test + public void testSinkTasksStdout() { + PowerMock.replayAll(); + + sinkProperties.remove(FileStreamSourceConnector.FILE_CONFIG); + connector.start(sinkProperties); + List> taskConfigs = connector.taskConfigs(1); + assertEquals(1, taskConfigs.size()); + assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG)); + + PowerMock.verifyAll(); + } + + @Test public void testTaskClass() { PowerMock.replayAll(); http://git-wip-us.apache.org/repos/asf/kafka/blob/198a43d8/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java index e8637f2..eb91dbd 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java @@ -26,6 +26,7 @@ import org.junit.Before; import org.junit.Test; import org.powermock.api.easymock.PowerMock; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -133,6 +134,24 @@ public class FileStreamSourceTaskTest { task.start(config); } + @Test + public void testMissingFile() throws InterruptedException { + replay(); + + String data = "line\n"; + System.setIn(new ByteArrayInputStream(data.getBytes())); + + config.remove(FileStreamSourceConnector.FILE_CONFIG); + task.start(config); + + List records = task.poll(); + assertEquals(1, records.size()); + assertEquals(TOPIC, records.get(0).topic()); + assertEquals("line", records.get(0).value()); + + task.stop(); + } + public void testInvalidFile() throws InterruptedException { config.put(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename"); task.start(config);