kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7790: Fix Bugs in Trogdor Task Expiration (#6103)
Date Fri, 11 Jan 2019 21:38:10 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 625e0d8  KAFKA-7790: Fix Bugs in Trogdor Task Expiration (#6103)
625e0d8 is described below

commit 625e0d882944f74568464037c3a8aafb585079f2
Author: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
AuthorDate: Fri Jan 11 23:38:00 2019 +0200

    KAFKA-7790: Fix Bugs in Trogdor Task Expiration (#6103)
    
    The Trogdor Coordinator now overwrites a task's startMs to the time it received it if
startMs is in the past.
    
    The Trogdor Agent now correctly expires a task after the expiry time (startMs + durationMs)
passes. Previously, it would ignore startMs and expire after durationMs milliseconds of local
start of the task.
    
    Reviewed-by: Colin P. McCabe <cmccabe@apache.org>
---
 .../apache/kafka/trogdor/agent/WorkerManager.java  |   9 +-
 .../kafka/trogdor/coordinator/NodeManager.java     |  84 ++++++++------
 .../kafka/trogdor/coordinator/TaskManager.java     |  34 ++++--
 .../org/apache/kafka/trogdor/task/TaskSpec.java    |   7 ++
 .../org/apache/kafka/trogdor/agent/AgentTest.java  |  71 +++++++++---
 .../kafka/trogdor/common/MiniTrogdorCluster.java   |  23 +++-
 .../kafka/trogdor/coordinator/CoordinatorTest.java | 125 +++++++++++++++++++--
 7 files changed, 282 insertions(+), 71 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
index ef02716..bf3d293 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
@@ -280,7 +280,8 @@ public final class WorkerManager {
         void transitionToRunning() {
             state = State.RUNNING;
             timeoutFuture = scheduler.schedule(stateChangeExecutor,
-                new StopWorker(workerId, false), spec.durationMs());
+                new StopWorker(workerId, false),
+                Math.max(0, spec.endMs() - time.milliseconds()));
         }
 
         void transitionToStopping() {
@@ -316,6 +317,12 @@ public final class WorkerManager {
                     "a worker with that id.", nodeName, workerId);
                 return;
             }
+            if (worker.spec.endMs() <= time.milliseconds()) {
+                log.info("{}: Will not run worker {} as it has expired.", nodeName, worker);
+                stateChangeExecutor.submit(new HandleWorkerHalting(worker,
+                    "worker expired", true));
+                return;
+            }
             KafkaFutureImpl<String> haltFuture = new KafkaFutureImpl<>();
             haltFuture.thenApply((KafkaFuture.BaseFunction<String, Void>) errorString
-> {
                 if (errorString == null)
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
index 3f0075e..97ad4ae 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
@@ -49,10 +49,12 @@ import org.apache.kafka.trogdor.common.ThreadUtils;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
 import org.apache.kafka.trogdor.rest.StopWorkerRequest;
+import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerReceiving;
 import org.apache.kafka.trogdor.rest.WorkerRunning;
 import org.apache.kafka.trogdor.rest.WorkerStarting;
 import org.apache.kafka.trogdor.rest.WorkerState;
+import org.apache.kafka.trogdor.rest.WorkerStopping;
 import org.apache.kafka.trogdor.task.TaskSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -202,46 +204,58 @@ public final class NodeManager {
                 if (log.isTraceEnabled()) {
                     log.trace("{}: got heartbeat status {}", node.name(), agentStatus);
                 }
-                // Identify workers which we think should be running, but which do not appear
-                // in the agent's response.  We need to send startWorker requests for these.
-                for (Map.Entry<Long, ManagedWorker> entry : workers.entrySet()) {
-                    Long workerId = entry.getKey();
-                    if (!agentStatus.workers().containsKey(workerId)) {
-                        ManagedWorker worker = entry.getValue();
-                        if (worker.shouldRun) {
-                            worker.tryCreate();
-                        }
+                handleMissingWorkers(agentStatus);
+                handlePresentWorkers(agentStatus);
+            } catch (Throwable e) {
+                log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(),
e);
+            }
+        }
+
+        /**
+         * Identify workers which we think should be running but do not appear in the agent's
response.
+         * We need to send startWorker requests for those
+         */
+        private void handleMissingWorkers(AgentStatusResponse agentStatus) {
+            for (Map.Entry<Long, ManagedWorker> entry : workers.entrySet()) {
+                Long workerId = entry.getKey();
+                if (!agentStatus.workers().containsKey(workerId)) {
+                    ManagedWorker worker = entry.getValue();
+                    if (worker.shouldRun) {
+                        worker.tryCreate();
                     }
                 }
-                for (Map.Entry<Long, WorkerState> entry : agentStatus.workers().entrySet())
{
-                    long workerId = entry.getKey();
-                    WorkerState state = entry.getValue();
-                    ManagedWorker worker = workers.get(workerId);
-                    if (worker == null) {
-                        // Identify tasks which are running, but which we don't know about.
-                        // Add these to the NodeManager as tasks that should not be running.
-                        log.warn("{}: scheduling unknown worker with ID {} for stopping.",
node.name(), workerId);
-                        workers.put(workerId, new ManagedWorker(workerId, state.taskId(),
-                            state.spec(), false, state));
-                    } else {
-                        // Handle workers which need to be stopped.
-                        if (state instanceof WorkerStarting || state instanceof WorkerRunning)
{
-                            if (!worker.shouldRun) {
-                                worker.tryStop();
-                            }
-                        }
-                        // Notify the TaskManager if the worker state has changed.
-                        if (worker.state.equals(state)) {
-                            log.debug("{}: worker state is still {}", node.name(), worker.state);
-                        } else {
-                            log.info("{}: worker state changed from {} to {}", node.name(),
worker.state, state);
-                            worker.state = state;
-                            taskManager.updateWorkerState(node.name(), worker.workerId, state);
+            }
+        }
+
+        private void handlePresentWorkers(AgentStatusResponse agentStatus) {
+            for (Map.Entry<Long, WorkerState> entry : agentStatus.workers().entrySet())
{
+                long workerId = entry.getKey();
+                WorkerState state = entry.getValue();
+                ManagedWorker worker = workers.get(workerId);
+                if (worker == null) {
+                    // Identify tasks which are running, but which we don't know about.
+                    // Add these to the NodeManager as tasks that should not be running.
+                    log.warn("{}: scheduling unknown worker with ID {} for stopping.", node.name(),
workerId);
+                    workers.put(workerId, new ManagedWorker(workerId, state.taskId(),
+                        state.spec(), false, state));
+                } else {
+                    // Handle workers which need to be stopped.
+                    if (state instanceof WorkerStarting || state instanceof WorkerRunning)
{
+                        if (!worker.shouldRun) {
+                            worker.tryStop();
                         }
                     }
+                    // Notify the TaskManager if the worker state has changed.
+                    if (worker.state.equals(state)) {
+                        log.debug("{}: worker state is still {}", node.name(), worker.state);
+                    } else {
+                        log.info("{}: worker state changed from {} to {}", node.name(), worker.state,
state);
+                        if (state instanceof WorkerDone || state instanceof WorkerStopping)
+                            worker.shouldRun = false;
+                        worker.state = state;
+                        taskManager.updateWorkerState(node.name(), worker.workerId, state);
+                    }
                 }
-            } catch (Throwable e) {
-                log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(),
e);
             }
         }
     }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
index 18ff9cb..941656e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -17,8 +17,10 @@
 
 package org.apache.kafka.trogdor.coordinator;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.LongNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.InvalidRequestException;
@@ -150,7 +152,13 @@ public final class TaskManager {
         final private String id;
 
         /**
-         * The task specification.
+         * The original task specification as submitted when the task was created.
+         */
+        final private TaskSpec originalSpec;
+
+        /**
+         * The effective task specification.
+         * The start time will be adjusted to reflect the time when the task was submitted.
          */
         final private TaskSpec spec;
 
@@ -195,8 +203,10 @@ public final class TaskManager {
          */
         private String error = "";
 
-        ManagedTask(String id, TaskSpec spec, TaskController controller, TaskStateType state)
{
+        ManagedTask(String id, TaskSpec originalSpec, TaskSpec spec,
+                    TaskController controller, TaskStateType state) {
             this.id = id;
+            this.originalSpec = originalSpec;
             this.spec = spec;
             this.controller = controller;
             this.state = state;
@@ -297,7 +307,7 @@ public final class TaskManager {
             throws Throwable {
         try {
             executor.submit(new CreateTask(id, spec)).get();
-        } catch (ExecutionException e) {
+        } catch (ExecutionException | JsonProcessingException e) {
             log.info("createTask(id={}, spec={}) error", id, spec, e);
             throw e.getCause();
         }
@@ -308,11 +318,15 @@ public final class TaskManager {
      */
     class CreateTask implements Callable<Void> {
         private final String id;
+        private final TaskSpec originalSpec;
         private final TaskSpec spec;
 
-        CreateTask(String id, TaskSpec spec) {
+        CreateTask(String id, TaskSpec spec) throws JsonProcessingException {
             this.id = id;
-            this.spec = spec;
+            this.originalSpec = spec;
+            ObjectNode node = JsonUtil.JSON_SERDE.valueToTree(originalSpec);
+            node.set("startMs", new LongNode(Math.max(time.milliseconds(), originalSpec.startMs())));
+            this.spec = JsonUtil.JSON_SERDE.treeToValue(node, TaskSpec.class);
         }
 
         @Override
@@ -322,11 +336,11 @@ public final class TaskManager {
             }
             ManagedTask task = tasks.get(id);
             if (task != null) {
-                if (!task.spec.equals(spec)) {
+                if (!task.originalSpec.equals(originalSpec)) {
                     throw new RequestConflictException("Task ID " + id + " already " +
-                        "exists, and has a different spec " + task.spec);
+                        "exists, and has a different spec " + task.originalSpec);
                 }
-                log.info("Task {} already exists with spec {}", id, spec);
+                log.info("Task {} already exists with spec {}", id, originalSpec);
                 return null;
             }
             TaskController controller = null;
@@ -339,13 +353,13 @@ public final class TaskManager {
             if (failure != null) {
                 log.info("Failed to create a new task {} with spec {}: {}",
                     id, spec, failure);
-                task = new ManagedTask(id, spec, null, TaskStateType.DONE);
+                task = new ManagedTask(id, originalSpec, spec, null, TaskStateType.DONE);
                 task.doneMs = time.milliseconds();
                 task.maybeSetError(failure);
                 tasks.put(id, task);
                 return null;
             }
-            task = new ManagedTask(id, spec, controller, TaskStateType.PENDING);
+            task = new ManagedTask(id, originalSpec, spec, controller, TaskStateType.PENDING);
             tasks.put(id, task);
             long delayMs = task.startDelayMs(time.milliseconds());
             task.startFuture = scheduler.schedule(executor, new RunTask(task), delayMs);
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java
index af7a76f..acb19f6 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java
@@ -67,6 +67,13 @@ public abstract class TaskSpec {
     }
 
     /**
+     * Get the deadline time of this task in ms
+     */
+    public final long endMs() {
+        return startMs + durationMs;
+    }
+
+    /**
      * Get the duration of this task in ms.
      */
     @JsonProperty
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
index f0ea475..6c20083 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.RequestConflictException;
 import org.apache.kafka.trogdor.rest.StopWorkerRequest;
+import org.apache.kafka.trogdor.rest.TaskDone;
 import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerRunning;
 import org.apache.kafka.trogdor.task.NoOpTaskSpec;
@@ -110,6 +111,43 @@ public class AgentTest {
     }
 
     @Test
+    public void testCreateExpiredWorkerIsNotScheduled() throws Exception {
+        long initialTimeMs = 100;
+        long tickMs = 15;
+        final boolean[] toSleep = {true};
+        MockTime time = new MockTime(tickMs, initialTimeMs, 0) {
+            /**
+             * Modify sleep() to call super.sleep() every second call
+             * in order to avoid the endless loop in the tick() calls to the MockScheduler
listener
+             */
+            @Override
+            public void sleep(long ms) {
+                toSleep[0] = !toSleep[0];
+                if (toSleep[0])
+                    super.sleep(ms);
+            }
+        };
+        MockScheduler scheduler = new MockScheduler(time);
+        Agent agent = createAgent(scheduler);
+        AgentClient client = new AgentClient.Builder().
+            maxTries(10).target("localhost", agent.port()).build();
+        AgentStatusResponse status = client.status();
+        assertEquals(Collections.emptyMap(), status.workers());
+        new ExpectedTasks().waitFor(client);
+
+        final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 10);
+        client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
+        long actualStartTimeMs = initialTimeMs + tickMs;
+        long doneMs = actualStartTimeMs + 2 * tickMs;
+        new ExpectedTasks().addTask(new ExpectedTaskBuilder("foo").
+            workerState(new WorkerDone("foo", fooSpec, actualStartTimeMs,
+                doneMs, null, "worker expired")).
+            taskState(new TaskDone(fooSpec, actualStartTimeMs, doneMs, "worker expired",
false, null)).
+            build()).
+            waitFor(client);
+    }
+
+    @Test
     public void testAgentCreateWorkers() throws Exception {
         MockTime time = new MockTime(0, 0, 0);
         MockScheduler scheduler = new MockScheduler(time);
@@ -171,53 +209,58 @@ public class AgentTest {
 
     @Test
     public void testAgentFinishesTasks() throws Exception {
-        MockTime time = new MockTime(0, 0, 0);
+        long startTimeMs = 2000;
+        MockTime time = new MockTime(0, startTimeMs, 0);
         MockScheduler scheduler = new MockScheduler(time);
         Agent agent = createAgent(scheduler);
         AgentClient client = new AgentClient.Builder().
             maxTries(10).target("localhost", agent.port()).build();
         new ExpectedTasks().waitFor(client);
 
-        final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 2);
+        final NoOpTaskSpec fooSpec = new NoOpTaskSpec(startTimeMs, 2);
+        long fooSpecStartTimeMs = startTimeMs;
         client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))).
+                workerState(new WorkerRunning("foo", fooSpec, startTimeMs, new TextNode("active"))).
                 build()).
             waitFor(client);
 
         time.sleep(1);
 
-        final NoOpTaskSpec barSpec = new NoOpTaskSpec(2000, 900000);
-        client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
+        long barSpecWorkerId = 1;
+        long barSpecStartTimeMs = startTimeMs + 1;
+        final NoOpTaskSpec barSpec = new NoOpTaskSpec(startTimeMs, 900000);
+        client.createWorker(new CreateWorkerRequest(barSpecWorkerId, "bar", barSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))).
+                workerState(new WorkerRunning("foo", fooSpec, fooSpecStartTimeMs, new TextNode("active"))).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning("bar", barSpec, 1, new TextNode("active"))).
+                workerState(new WorkerRunning("bar", barSpec, barSpecStartTimeMs, new TextNode("active"))).
                 build()).
             waitFor(client);
 
         time.sleep(1);
 
+        // foo task expired
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerDone("foo", fooSpec, 0, 2, new TextNode("done"), "")).
+                workerState(new WorkerDone("foo", fooSpec, fooSpecStartTimeMs, fooSpecStartTimeMs
+ 2, new TextNode("done"), "")).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning("bar", barSpec, 1, new TextNode("active"))).
+                workerState(new WorkerRunning("bar", barSpec, barSpecStartTimeMs, new TextNode("active"))).
                 build()).
             waitFor(client);
 
         time.sleep(5);
-        client.stopWorker(new StopWorkerRequest(1));
+        client.stopWorker(new StopWorkerRequest(barSpecWorkerId));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerDone("foo", fooSpec, 0, 2, new TextNode("done"), "")).
+                workerState(new WorkerDone("foo", fooSpec, fooSpecStartTimeMs, fooSpecStartTimeMs
+ 2, new TextNode("done"), "")).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerDone("bar", barSpec, 1, 7, new TextNode("done"), "")).
+                workerState(new WorkerDone("bar", barSpec, barSpecStartTimeMs, startTimeMs
+ 7, new TextNode("done"), "")).
                 build()).
             waitFor(client);
 
@@ -348,7 +391,7 @@ public class AgentTest {
             maxTries(10).target("localhost", agent.port()).build();
         new ExpectedTasks().waitFor(client);
 
-        final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 5);
+        final NoOpTaskSpec fooSpec = new NoOpTaskSpec(0, 5);
         client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
@@ -363,7 +406,7 @@ public class AgentTest {
         new ExpectedTasks().waitFor(client);
         time.sleep(1);
 
-        final NoOpTaskSpec fooSpec2 = new NoOpTaskSpec(100, 1);
+        final NoOpTaskSpec fooSpec2 = new NoOpTaskSpec(2, 1);
         client.createWorker(new CreateWorkerRequest(1, "foo", fooSpec2));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
index 9edffaa..0c9aba2 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
@@ -210,17 +210,25 @@ public class MiniTrogdorCluster implements AutoCloseable {
                     coordinator = node.coordinator;
                 }
             }
-            return new MiniTrogdorCluster(agents, coordinator);
+            return new MiniTrogdorCluster(scheduler, agents, nodes, coordinator);
         }
     }
 
     private final TreeMap<String, Agent> agents;
 
+    private final TreeMap<String, Builder.NodeData> nodesByAgent;
+
     private final Coordinator coordinator;
 
-    private MiniTrogdorCluster(TreeMap<String, Agent> agents,
+    private final Scheduler scheduler;
+
+    private MiniTrogdorCluster(Scheduler scheduler,
+                               TreeMap<String, Agent> agents,
+                               TreeMap<String, Builder.NodeData> nodesByAgent,
                                Coordinator coordinator) {
+        this.scheduler = scheduler;
         this.agents = agents;
+        this.nodesByAgent = nodesByAgent;
         this.coordinator = coordinator;
     }
 
@@ -242,6 +250,17 @@ public class MiniTrogdorCluster implements AutoCloseable {
             build();
     }
 
+    /**
+     * Mimic a restart of a Trogdor agent, essentially cleaning out all of its active workers
+     */
+    public void restartAgent(String nodeName) {
+        if (!agents.containsKey(nodeName)) {
+            throw new RuntimeException("There is no agent on node " + nodeName);
+        }
+        Builder.NodeData node = nodesByAgent.get(nodeName);
+        agents.put(nodeName, new Agent(node.platform, scheduler, node.agentRestServer, node.agentRestResource));
+    }
+
     public AgentClient agentClient(String nodeName) {
         Agent agent = agents.get(nodeName);
         if (agent == null) {
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index db1afac..0247951 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -154,7 +154,7 @@ public class CoordinatorTest {
                 waitFor(agentClient1).
                 waitFor(agentClient2);
 
-            NoOpTaskSpec fooSpec = new NoOpTaskSpec(5, 2);
+            NoOpTaskSpec fooSpec = new NoOpTaskSpec(5, 7);
             coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").taskState(
@@ -176,15 +176,15 @@ public class CoordinatorTest {
                 waitFor(agentClient1).
                 waitFor(agentClient2);
 
-            time.sleep(2);
+            time.sleep(7);
             ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance);
             status2.set("node01", new TextNode("done"));
             status2.set("node02", new TextNode("done"));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    taskState(new TaskDone(fooSpec, 11, 13,
+                    taskState(new TaskDone(fooSpec, 11, 18,
                         "", false, status2)).
-                    workerState(new WorkerDone("foo", fooSpec, 11, 13, new TextNode("done"),
"")).
+                    workerState(new WorkerDone("foo", fooSpec, 11, 18, new TextNode("done"),
"")).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
@@ -211,7 +211,7 @@ public class CoordinatorTest {
                 waitFor(agentClient1).
                 waitFor(agentClient2);
 
-            NoOpTaskSpec fooSpec = new NoOpTaskSpec(5, 2);
+            NoOpTaskSpec fooSpec = new NoOpTaskSpec(5, 7);
             coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").taskState(new TaskPending(fooSpec)).build()).
@@ -236,13 +236,13 @@ public class CoordinatorTest {
             ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance);
             status2.set("node01", new TextNode("done"));
             status2.set("node02", new TextNode("done"));
-            time.sleep(1);
+            time.sleep(7);
             coordinatorClient.stopTask(new StopTaskRequest("foo"));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    taskState(new TaskDone(fooSpec, 11, 12, "",
+                    taskState(new TaskDone(fooSpec, 11, 18, "",
                         true, status2)).
-                    workerState(new WorkerDone("foo", fooSpec, 11, 12, new TextNode("done"),
"")).
+                    workerState(new WorkerDone("foo", fooSpec, 11, 18, new TextNode("done"),
"")).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
@@ -275,7 +275,7 @@ public class CoordinatorTest {
                 waitFor(agentClient1).
                 waitFor(agentClient2);
 
-            NoOpTaskSpec fooSpec = new NoOpTaskSpec(2, 2);
+            NoOpTaskSpec fooSpec = new NoOpTaskSpec(2, 12);
             coordinatorClient.destroyTask(new DestroyTaskRequest("foo"));
             coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec));
             NoOpTaskSpec barSpec = new NoOpTaskSpec(20, 20);
@@ -363,12 +363,15 @@ public class CoordinatorTest {
     @Test
     public void testNetworkPartitionFault() throws Exception {
         CapturingCommandRunner runner = new CapturingCommandRunner();
+        MockTime time = new MockTime(0, 0, 0);
+        Scheduler scheduler = new MockScheduler(time);
         try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
                 addCoordinator("node01").
                 addAgent("node01").
                 addAgent("node02").
                 addAgent("node03").
                 commandRunner(runner).
+                scheduler(scheduler).
                 build()) {
             CoordinatorClient coordinatorClient = cluster.coordinatorClient();
             NetworkPartitionFaultSpec spec = new NetworkPartitionFaultSpec(0, Long.MAX_VALUE,
@@ -496,6 +499,110 @@ public class CoordinatorTest {
         }
     }
 
+    /**
+     * If an agent fails in the middle of a task and comes back up when the task is considered
expired,
+     * we want the task to be marked as DONE and not re-sent should a second failure happen.
+     */
+    @Test
+    public void testAgentFailureAndTaskExpiry() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        Scheduler scheduler = new MockScheduler(time);
+        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
+            addCoordinator("node01").
+            addAgent("node02").
+            scheduler(scheduler).
+            build()) {
+            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
+
+            NoOpTaskSpec fooSpec = new NoOpTaskSpec(1, 500);
+            coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec));
+            TaskState expectedState = new ExpectedTaskBuilder("foo").taskState(new TaskPending(fooSpec)).build().taskState();
+
+            TaskState resp = coordinatorClient.task(new TaskRequest("foo"));
+            assertEquals(expectedState, resp);
+
+
+            time.sleep(2);
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskRunning(fooSpec, 2, new TextNode("active"))).
+                    workerState(new WorkerRunning("foo", fooSpec, 2, new TextNode("active"))).
+                    build()).
+                waitFor(coordinatorClient).
+                waitFor(cluster.agentClient("node02"));
+
+            cluster.restartAgent("node02");
+            time.sleep(550);
+            // coordinator heartbeat sees that the agent is back up, re-schedules the task
but the agent expires it
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskDone(fooSpec, 2, 552, "worker expired", false, null)).
+                    workerState(new WorkerDone("foo", fooSpec, 552, 552, null, "worker expired")).
+                    build()).
+                waitFor(coordinatorClient).
+                waitFor(cluster.agentClient("node02"));
+
+            cluster.restartAgent("node02");
+            // coordinator heartbeat sees that the agent is back up but does not re-schedule
the task as it is DONE
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskDone(fooSpec, 2, 552, "worker expired", false, null)).
+                    // no worker states
+                    build()).
+                waitFor(coordinatorClient).
+                waitFor(cluster.agentClient("node02"));
+        }
+    }
+
+    @Test
+    public void testTaskRequestWithOldStartMsGetsUpdated() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        Scheduler scheduler = new MockScheduler(time);
+        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
+            addCoordinator("node01").
+            addAgent("node02").
+            scheduler(scheduler).
+            build()) {
+
+            NoOpTaskSpec fooSpec = new NoOpTaskSpec(1, 500);
+            time.sleep(552);
+
+            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
+            NoOpTaskSpec updatedSpec = new NoOpTaskSpec(552, 500);
+            coordinatorClient.createTask(new CreateTaskRequest("fooSpec", fooSpec));
+            TaskState expectedState = new ExpectedTaskBuilder("fooSpec").taskState(
+                new TaskRunning(updatedSpec, 552, new TextNode("receiving"))
+            ).build().taskState();
+
+            TaskState resp = coordinatorClient.task(new TaskRequest("fooSpec"));
+            assertEquals(expectedState, resp);
+        }
+    }
+
+    @Test
+    public void testTaskRequestWithFutureStartMsDoesNotGetRun() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        Scheduler scheduler = new MockScheduler(time);
+        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
+            addCoordinator("node01").
+            addAgent("node02").
+            scheduler(scheduler).
+            build()) {
+
+            NoOpTaskSpec fooSpec = new NoOpTaskSpec(1000, 500);
+            time.sleep(999);
+
+            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
+            coordinatorClient.createTask(new CreateTaskRequest("fooSpec", fooSpec));
+            TaskState expectedState = new ExpectedTaskBuilder("fooSpec").taskState(
+                new TaskPending(fooSpec)
+            ).build().taskState();
+
+            TaskState resp = coordinatorClient.task(new TaskRequest("fooSpec"));
+            assertEquals(expectedState, resp);
+        }
+    }
+
     @Test
     public void testTaskRequest() throws Exception {
         MockTime time = new MockTime(0, 0, 0);


Mime
View raw message