kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3935; Fix test_restart_failed_task system test for SinkTasks
Date Tue, 26 Jul 2016 02:02:08 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8a417c89d -> d1546960d


KAFKA-3935; Fix test_restart_failed_task system test for SinkTasks

Fix the test by using a more liberal timeout and forcing more frequent SinkTask.put() calls.
Also add some logging to aid future debugging.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #1663 from ewencp/kafka-3935-fix-restart-system-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d1546960
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d1546960
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d1546960

Branch: refs/heads/trunk
Commit: d1546960de0aa43989680a59c8f6b1ae7cb333e9
Parents: 8a417c8
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Tue Jul 26 03:02:02 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Jul 26 03:02:02 2016 +0100

----------------------------------------------------------------------
 .../kafka/connect/tools/MockConnector.java       |  7 +++++++
 .../apache/kafka/connect/tools/MockSinkTask.java | 19 ++++++++++++++++++-
 .../kafka/connect/tools/MockSourceTask.java      |  9 ++++++++-
 .../tests/connect/connect_distributed_test.py    |  2 +-
 4 files changed, 34 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d1546960/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
index 919e896..51bb519 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.List;
@@ -49,6 +51,8 @@ public class MockConnector extends Connector {
 
     public static final long DEFAULT_FAILURE_DELAY_MS = 15000;
 
+    private static final Logger log = LoggerFactory.getLogger(MockConnector.class);
+
     private Map<String, String> config;
     private ScheduledExecutorService executor;
 
@@ -69,10 +73,12 @@ public class MockConnector extends Connector {
             if (delayMsString != null)
                 delayMs = Long.parseLong(delayMsString);
 
+            log.debug("Started MockConnector with failure delay of {} ms", delayMs);
             executor = Executors.newSingleThreadScheduledExecutor();
             executor.schedule(new Runnable() {
                 @Override
                 public void run() {
+                    log.debug("Triggering connector failure");
                     context.raiseError(new RuntimeException());
                 }
             }, delayMs, TimeUnit.MILLISECONDS);
@@ -86,6 +92,7 @@ public class MockConnector extends Connector {
 
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
+        log.debug("Creating single task for MockConnector");
         return Collections.singletonList(config);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1546960/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
index 2e4b35e..b0de58d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
@@ -21,11 +21,14 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Map;
 
 public class MockSinkTask extends SinkTask {
+    private static final Logger log = LoggerFactory.getLogger(MockSinkTask.class);
 
     private String mockMode;
     private long startTimeMs;
@@ -47,6 +50,9 @@ public class MockSinkTask extends SinkTask {
             this.failureDelayMs = MockConnector.DEFAULT_FAILURE_DELAY_MS;
             if (delayMsString != null)
                 failureDelayMs = Long.parseLong(delayMsString);
+
+            log.debug("Started MockSinkTask at {} with failure scheduled in {} ms", startTimeMs,
failureDelayMs);
+            setTimeout();
         }
     }
 
@@ -54,8 +60,11 @@ public class MockSinkTask extends SinkTask {
     public void put(Collection<SinkRecord> records) {
         if (MockConnector.TASK_FAILURE.equals(mockMode)) {
             long now = System.currentTimeMillis();
-            if (now > startTimeMs + failureDelayMs)
+            if (now > startTimeMs + failureDelayMs) {
+                log.debug("Triggering sink task failure");
                 throw new RuntimeException();
+            }
+            setTimeout();
         }
     }
 
@@ -68,4 +77,12 @@ public class MockSinkTask extends SinkTask {
     public void stop() {
 
     }
+
+    private void setTimeout() {
+        // Set a reasonable minimum delay. Since this mock task may not actually consume
any data from Kafka, it may only
+        // see put() calls triggered by wakeups for offset commits. To make sure we aren't
tied to the offset commit
+        // interval, we force a wakeup every 250ms or after the failure delay, whichever
is smaller. This is not overly
+        // aggressive but ensures any scheduled tasks this connector performs are reasonably
close to the target time.
+        context.timeout(Math.min(failureDelayMs, 250));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1546960/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
index eb896af..d7288f8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
@@ -19,12 +19,15 @@ package org.apache.kafka.connect.tools;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 public class MockSourceTask extends SourceTask {
+    private static final Logger log = LoggerFactory.getLogger(MockSourceTask.class);
 
     private String mockMode;
     private long startTimeMs;
@@ -46,6 +49,8 @@ public class MockSourceTask extends SourceTask {
             this.failureDelayMs = MockConnector.DEFAULT_FAILURE_DELAY_MS;
             if (delayMsString != null)
                 failureDelayMs = Long.parseLong(delayMsString);
+
+            log.debug("Started MockSourceTask at {} with failure scheduled in {} ms", startTimeMs,
failureDelayMs);
         }
     }
 
@@ -53,8 +58,10 @@ public class MockSourceTask extends SourceTask {
     public List<SourceRecord> poll() throws InterruptedException {
         if (MockConnector.TASK_FAILURE.equals(mockMode)) {
             long now = System.currentTimeMillis();
-            if (now > startTimeMs + failureDelayMs)
+            if (now > startTimeMs + failureDelayMs) {
+                log.debug("Triggering source task failure");
                 throw new RuntimeException();
+            }
         }
         return Collections.emptyList();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1546960/tests/kafkatest/tests/connect/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index b9757ba..1902c59 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -171,7 +171,7 @@ class ConnectDistributedTest(Test):
         connector.start()
 
         task_id = 0
-        wait_until(lambda: self.task_is_failed(connector, task_id), timeout_sec=15,
+        wait_until(lambda: self.task_is_failed(connector, task_id), timeout_sec=20,
                    err_msg="Failed to see task transition to the FAILED state")
 
         self.cc.restart_task(connector.name, task_id)


Mime
View raw message