kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-4575: ensure topic created before starting sink for ConnectDistributedTest.test_pause_resume_sink
Date Tue, 28 Mar 2017 00:46:16 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 8814c2118 -> aa60a92c0


KAFKA-4575: ensure topic created before starting sink for ConnectDistributedTest.test_pause_resume_sink

Otherwise in this test the sink task goes through the pause/resume cycle with 0 assigned partitions,
since the default metadata refresh interval is quite long

Author: Shikhar Bhushan <shikhar@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2313 from shikhar/kafka-4575


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa60a92c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa60a92c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa60a92c

Branch: refs/heads/0.10.0
Commit: aa60a92c062b07fd619142f91d10e23c7c4ede6c
Parents: 8814c21
Author: Shikhar Bhushan <shikhar@confluent.io>
Authored: Thu Jan 5 15:25:00 2017 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon Mar 27 10:07:13 2017 -0700

----------------------------------------------------------------------
 .../connect/tools/VerifiableSourceTask.java      | 19 +++++++++++++++++++
 tests/kafkatest/services/connect.py              |  6 ++++++
 .../tests/connect/connect_distributed_test.py    | 17 ++++++++++-------
 3 files changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/aa60a92c/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
index 6dcfdc4..0436265 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
@@ -122,6 +122,25 @@ public class VerifiableSourceTask extends SourceTask {
     }
 
     @Override
+    public void commitRecord(SourceRecord record) throws InterruptedException {
+        Map<String, Object> data = new HashMap<>();
+        data.put("name", name);
+        data.put("task", id);
+        data.put("topic", this.topic);
+        data.put("time_ms", System.currentTimeMillis());
+        data.put("seqno", record.value());
+        data.put("committed", true);
+
+        String dataJson;
+        try {
+            dataJson = JSON_SERDE.writeValueAsString(data);
+        } catch (JsonProcessingException e) {
+            dataJson = "Bad data can't be written as json: " + e.getMessage();
+        }
+        System.out.println(dataJson);
+    }
+
+    @Override
     public void stop() {
         throttler.wakeup();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa60a92c/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index ebc19b0..3d87e4d 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -318,6 +318,12 @@ class VerifiableSource(VerifiableConnector):
         self.topic = topic
         self.throughput = throughput
 
+    def committed_messages(self):
+        return filter(lambda m: 'committed' in m and m['committed'], self.messages())
+
+    def sent_messages(self):
+        return filter(lambda m: 'committed' not in m or not m['committed'], self.messages())
+
     def start(self):
         self.logger.info("Creating connector VerifiableSourceConnector %s", self.name)
         self.cc.create_connector({

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa60a92c/tests/kafkatest/tests/connect/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 1902c59..a7433fa 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -204,9 +204,9 @@ class ConnectDistributedTest(Test):
                        err_msg="Failed to see connector transition to the PAUSED state")
 
         # verify that we do not produce new messages while paused
-        num_messages = len(self.source.messages())
+        num_messages = len(self.source.sent_messages())
         time.sleep(10)
-        assert num_messages == len(self.source.messages()), "Paused source connector should
not produce any messages"
+        assert num_messages == len(self.source.sent_messages()), "Paused source connector
should not produce any messages"
 
         self.cc.resume_connector(self.source.name)
 
@@ -215,7 +215,7 @@ class ConnectDistributedTest(Test):
                        err_msg="Failed to see connector transition to the RUNNING state")
 
         # after resuming, we should see records produced again
-        wait_until(lambda: len(self.source.messages()) > num_messages, timeout_sec=30,
+        wait_until(lambda: len(self.source.sent_messages()) > num_messages, timeout_sec=30,
                    err_msg="Failed to produce messages after resuming source connector")
 
     def test_pause_and_resume_sink(self):
@@ -232,6 +232,9 @@ class ConnectDistributedTest(Test):
         self.source = VerifiableSource(self.cc)
         self.source.start()
 
+        wait_until(lambda: len(self.source.committed_messages()) > 0, timeout_sec=30,
+                   err_msg="Timeout expired waiting for source task to produce a message")
+
         self.sink = VerifiableSink(self.cc)
         self.sink.start()
 
@@ -258,7 +261,7 @@ class ConnectDistributedTest(Test):
 
         # after resuming, we should see records consumed again
         wait_until(lambda: len(self.sink.received_messages()) > num_messages, timeout_sec=30,
-                   err_msg="Failed to consume messages after resuming source connector")
+                   err_msg="Failed to consume messages after resuming sink connector")
 
 
     def test_pause_state_persistent(self):
@@ -365,8 +368,8 @@ class ConnectDistributedTest(Test):
         success = True
         errors = []
         allow_dups = not clean
-        src_messages = self.source.messages()
-        sink_messages = self.sink.messages()
+        src_messages = self.source.committed_messages()
+        sink_messages = self.sink.flushed_messages()
         for task in range(num_tasks):
             # Validate source messages
             src_seqnos = [msg['seqno'] for msg in src_messages if msg['task'] == task]
@@ -389,7 +392,7 @@ class ConnectDistributedTest(Test):
 
 
             # Validate sink messages
-            sink_seqnos = [msg['seqno'] for msg in sink_messages if msg['task'] == task and
'flushed' in msg]
+            sink_seqnos = [msg['seqno'] for msg in sink_messages if msg['task'] == task]
             # Every seqno up to the largest one we ever saw should appear. Each seqno should
only appear once because
             # clean bouncing should commit on rebalance.
             sink_seqno_max = max(sink_seqnos)


Mime
View raw message