kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [2/3] kafka git commit: KAFKA-5776; Add the Trogdor fault injection daemon
Date Fri, 25 Aug 2017 19:30:11 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
new file mode 100644
index 0000000..9f1a19a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Topology;
+
+import java.util.Set;
+
+public interface Fault {
+    /**
+     * Get the ID of this fault.
+     */
+    String id();
+
+    /**
+     * Get the specification for this Fault.
+     */
+    FaultSpec spec();
+
+    /**
+     * Activate the fault.
+     */
+    void activate(Platform platform) throws Exception;
+
+    /**
+     * Deactivate the fault.
+     */
+    void deactivate(Platform platform) throws Exception;
+
+    /**
+     * Get the nodes which this fault is targetting.
+     */
+    Set<String> targetNodes(Topology topology);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java
new file mode 100644
index 0000000..63e5ff4
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+
+public class FaultSet {
+    private final static long NS_PER_MS = 1000000L;
+
+    /**
+     * Maps fault start times in nanoseconds to faults.
+     */
+    private final TreeMap<Long, Fault> byStart = new TreeMap<Long, Fault>();
+
+    /**
+     * Maps fault end times in nanoseconds to faults.
+     */
+    private final TreeMap<Long, Fault> byEnd = new TreeMap<Long, Fault>();
+
+    /**
+     * Return an iterator that iterates over the fault set in start time order.
+     */
+    public FaultSetIterator iterateByStart() {
+        return new FaultSetIterator(byStart);
+    }
+
+    /**
+     * Return an iterator that iterates over the fault set in end time order.
+     */
+    public FaultSetIterator iterateByEnd() {
+        return new FaultSetIterator(byEnd);
+    }
+
+    /**
+     * Add a new fault to the FaultSet.
+     */
+    public void add(Fault fault) {
+        insertUnique(byStart, fault.spec().startMs() * NS_PER_MS, fault);
+        long endMs = fault.spec().startMs() + fault.spec().durationMs();
+        insertUnique(byEnd, endMs * NS_PER_MS, fault);
+    }
+
+    /**
+     * Insert a new fault to a TreeMap.
+     *
+     * If there is already a fault with the given key, the fault will be stored
+     * with the next available key.
+     */
+    private void insertUnique(TreeMap<Long, Fault> map, long key, Fault fault) {
+        while (true) {
+            Fault existing = map.get(key);
+            if (existing == null) {
+                map.put(key, fault);
+                return;
+            } else if (existing == fault) {
+                return;
+            } else {
+                key++;
+            }
+        }
+    }
+
+    /**
+     * Remove a fault from the TreeMap.  The fault is removed by object equality.
+     */
+    public void remove(Fault fault) {
+        removeUnique(byStart, fault.spec().startMs() * NS_PER_MS, fault);
+        long endMs = fault.spec().startMs() + fault.spec().durationMs();
+        removeUnique(byEnd, endMs * NS_PER_MS, fault);
+    }
+
+    /**
+     * Helper function to remove a fault from a map.  We will search every
+     * element of the map equal to or higher than the given key.
+     */
+    private void removeUnique(TreeMap<Long, Fault> map, long key, Fault fault) {
+        while (true) {
+            Map.Entry<Long, Fault> existing = map.ceilingEntry(key);
+            if (existing == null) {
+                throw new NoSuchElementException("No such element as " + fault);
+            } else if (existing.getValue() == fault) {
+                map.remove(existing.getKey());
+                return;
+            } else {
+                key = existing.getKey() + 1;
+            }
+        }
+    }
+
+    /**
+     * An iterator over the FaultSet.
+     */
+    class FaultSetIterator implements Iterator<Fault> {
+        private final TreeMap<Long, Fault> map;
+        private Fault cur = null;
+        private long prevKey = -1;
+
+        FaultSetIterator(TreeMap<Long, Fault> map) {
+            this.map = map;
+        }
+
+        @Override
+        public boolean hasNext() {
+            Map.Entry<Long, Fault> entry = map.higherEntry(prevKey);
+            return entry != null;
+        }
+
+        @Override
+        public Fault next() {
+            Map.Entry<Long, Fault> entry = map.higherEntry(prevKey);
+            if (entry == null) {
+                throw new NoSuchElementException();
+            }
+            prevKey = entry.getKey();
+            cur = entry.getValue();
+            return cur;
+        }
+
+        @Override
+        public void remove() {
+            if (cur == null) {
+                throw new IllegalStateException();
+            }
+            FaultSet.this.remove(cur);
+            cur = null;
+        }
+    }
+};

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java
new file mode 100644
index 0000000..e15c4e9
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.kafka.common.utils.Utils;
+
+
+/**
+ * The specification for a fault.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS,
+              include = JsonTypeInfo.As.PROPERTY,
+              property = "class")
+public interface FaultSpec {
+    class Util {
+        private static final String SPEC_STRING = "Spec";
+
+        public static Fault createFault(String faultId, FaultSpec faultSpec) throws ClassNotFoundException {
+            String faultSpecClassName = faultSpec.getClass().getName();
+            if (!faultSpecClassName.endsWith(SPEC_STRING)) {
+                throw new RuntimeException("FaultSpec class name must end with " + SPEC_STRING);
+            }
+            String faultClassName = faultSpecClassName.substring(0,
+                    faultSpecClassName.length() - SPEC_STRING.length());
+            return Utils.newParameterizedInstance(faultClassName,
+                String.class, faultId,
+                FaultSpec.class, faultSpec);
+        }
+    }
+
+    /**
+     * Get the start time of this fault in ms.
+     */
+    @JsonProperty
+    long startMs();
+
+    /**
+     * Get the duration of this fault in ms.
+     */
+    @JsonProperty
+    long durationMs();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java
new file mode 100644
index 0000000..bec0792
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+
+@JsonFormat(shape = JsonFormat.Shape.STRING)
+public enum FaultState {
+    PENDING,
+    RUNNING,
+    DONE
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java
new file mode 100644
index 0000000..7524af1
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.Node;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Topology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class NetworkPartitionFault implements Fault {
+    private static final Logger log = LoggerFactory.getLogger(NetworkPartitionFault.class);
+
+    private final String id;
+    private final NetworkPartitionFaultSpec spec;
+    private final List<Set<String>> partitions;
+
+    public NetworkPartitionFault(String id, FaultSpec spec) {
+        this.id = id;
+        this.spec = (NetworkPartitionFaultSpec) spec;
+        this.partitions = new ArrayList<>();
+        HashSet<String> prevNodes = new HashSet<>();
+        for (List<String> partition : this.spec.partitions()) {
+            for (String nodeName : partition) {
+                if (prevNodes.contains(nodeName)) {
+                    throw new RuntimeException("Node " + nodeName +
+                        " appears in more than one partition.");
+                }
+                prevNodes.add(nodeName);
+                this.partitions.add(new HashSet<String>(partition));
+            }
+        }
+    }
+
+    @Override
+    public String id() {
+        return id;
+    }
+
+    @Override
+    public FaultSpec spec() {
+        return spec;
+    }
+
+    @Override
+    public void activate(Platform platform) throws Exception {
+        log.info("Activating NetworkPartitionFault...");
+        runIptablesCommands(platform, "-A");
+    }
+
+    @Override
+    public void deactivate(Platform platform) throws Exception {
+        log.info("Deactivating NetworkPartitionFault...");
+        runIptablesCommands(platform, "-D");
+    }
+
+    private void runIptablesCommands(Platform platform, String iptablesAction) throws Exception {
+        Node curNode = platform.curNode();
+        Topology topology = platform.topology();
+        TreeSet<String> toBlock = new TreeSet<>();
+        for (Set<String> partition : partitions) {
+            if (!partition.contains(curNode.name())) {
+                for (String nodeName : partition) {
+                    toBlock.add(nodeName);
+                }
+            }
+        }
+        for (String nodeName : toBlock) {
+            Node node = topology.node(nodeName);
+            InetAddress addr = InetAddress.getByName(node.hostname());
+            platform.runCommand(new String[] {
+                "sudo", "iptables", iptablesAction, "INPUT", "-p", "tcp", "-s",
+                addr.getHostAddress(), "-j", "DROP", "-m", "comment", "--comment", nodeName
+            });
+        }
+    }
+
+    @Override
+    public Set<String> targetNodes(Topology topology) {
+        Set<String> targetNodes = new HashSet<>();
+        for (Set<String> partition : partitions) {
+            targetNodes.addAll(partition);
+        }
+        return targetNodes;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        NetworkPartitionFault that = (NetworkPartitionFault) o;
+        return Objects.equals(id, that.id) &&
+            Objects.equals(spec, that.spec) &&
+            Objects.equals(partitions, that.partitions);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, spec, partitions);
+    }
+
+    @Override
+    public String toString() {
+        return "NoOpFault(id=" + id + ", spec=" + JsonUtil.toJsonString(spec) + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java
new file mode 100644
index 0000000..d734dce
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.common.JsonUtil;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The specification for a fault that creates a network partition.
+ */
+public class NetworkPartitionFaultSpec extends AbstractFaultSpec {
+    private final List<List<String>> partitions;
+
+    @JsonCreator
+    public NetworkPartitionFaultSpec(@JsonProperty("startMs") long startMs,
+                         @JsonProperty("durationMs") long durationMs,
+                         @JsonProperty("partitions") List<List<String>> partitions) {
+        super(startMs, durationMs);
+        this.partitions = partitions;
+    }
+
+    @JsonProperty
+    public List<List<String>> partitions() {
+        return partitions;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        NetworkPartitionFaultSpec that = (NetworkPartitionFaultSpec) o;
+        return Objects.equals(startMs(), that.startMs()) &&
+            Objects.equals(durationMs(), that.durationMs()) &&
+            Objects.equals(partitions, that.partitions);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(startMs(), durationMs(), partitions);
+    }
+
+    @Override
+    public String toString() {
+        return JsonUtil.toJsonString(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java
new file mode 100644
index 0000000..c7ac4de
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.Node;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Topology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class NoOpFault implements Fault {
+    private static final Logger log = LoggerFactory.getLogger(NoOpFault.class);
+
+    private final String id;
+    private final FaultSpec spec;
+
+    public NoOpFault(String id, FaultSpec spec) {
+        this.id = id;
+        this.spec = spec;
+    }
+
+    @Override
+    public String id() {
+        return id;
+    }
+
+    @Override
+    public FaultSpec spec() {
+        return spec;
+    }
+
+    @Override
+    public void activate(Platform platform) {
+        log.info("Activating NoOpFault...");
+    }
+
+    @Override
+    public void deactivate(Platform platform) {
+        log.info("Deactivating NoOpFault...");
+    }
+
+    @Override
+    public Set<String> targetNodes(Topology topology) {
+        Set<String> set = new HashSet<>();
+        for (Map.Entry<String, Node> entry : topology.nodes().entrySet()) {
+            if (Node.Util.getTrogdorAgentPort(entry.getValue()) > 0) {
+                set.add(entry.getKey());
+            }
+        }
+        return set;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        NoOpFault that = (NoOpFault) o;
+        return Objects.equals(id, that.id) &&
+            Objects.equals(spec, that.spec);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, spec);
+    }
+
+    @Override
+    public String toString() {
+        return "NoOpFault(id=" + id + ", spec=" + JsonUtil.toJsonString(spec) + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java
new file mode 100644
index 0000000..1d4b94d
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * The specification for a fault that does nothing.
+ *
+ * This fault type exists mainly to test the fault injection system.
+ */
+public class NoOpFaultSpec extends AbstractFaultSpec {
+    @JsonCreator
+    public NoOpFaultSpec(@JsonProperty("startMs") long startMs,
+                         @JsonProperty("durationMs") long durationMs) {
+        super(startMs, durationMs);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        NoOpFaultSpec that = (NoOpFaultSpec) o;
+        return Objects.equals(startMs(), that.startMs()) &&
+            Objects.equals(durationMs(), that.durationMs());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(startMs(), durationMs());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java
new file mode 100644
index 0000000..a1b5246
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.common.JsonUtil;
+
+import java.util.Map;
+
+/**
+ * Response to GET /faults
+ */
+public class AgentFaultsResponse extends FaultDataMap {
+    @JsonCreator
+    public AgentFaultsResponse(@JsonProperty("faults") Map<String, FaultData> faults) {
+        super(faults);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        AgentFaultsResponse that = (AgentFaultsResponse) o;
+        return super.equals(that);
+    }
+
+    @Override
+    public int hashCode() {
+        return super.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return JsonUtil.toJsonString(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
new file mode 100644
index 0000000..8e32f87
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.common.JsonUtil;
+
+import java.util.Objects;
+
+/**
+ * The status of the Trogdor agent.
+ */
+public class AgentStatusResponse {
+    private final long startTimeMs;
+
+    @JsonCreator
+    public AgentStatusResponse(@JsonProperty("startTimeMs") long startTimeMs) {
+        this.startTimeMs = startTimeMs;
+    }
+
+    @JsonProperty
+    public long startTimeMs() {
+        return startTimeMs;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        AgentStatusResponse that = (AgentStatusResponse) o;
+        return Objects.equals(startTimeMs, that.startTimeMs);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(startTimeMs);
+    }
+
+    @Override
+    public String toString() {
+        return JsonUtil.toJsonString(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java
new file mode 100644
index 0000000..df26274
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.common.JsonUtil;
+
+import java.util.Map;
+
+/**
+ * Response to GET /faults
+ */
+public class CoordinatorFaultsResponse extends FaultDataMap {
+    @JsonCreator
+    public CoordinatorFaultsResponse(@JsonProperty("faults") Map<String, FaultData> faults) {
+        super(faults);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        CoordinatorFaultsResponse that = (CoordinatorFaultsResponse) o;
+        return super.equals(that);
+    }
+
+    @Override
+    public int hashCode() {
+        return super.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return JsonUtil.toJsonString(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java
new file mode 100644
index 0000000..348e310
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.common.JsonUtil;
+
+import java.util.Objects;
+
+/**
+ * The status of the Trogdor coordinator.
+ */
+public class CoordinatorStatusResponse {
+    private final long startTimeMs;
+
+    @JsonCreator
+    public CoordinatorStatusResponse(@JsonProperty("startTimeMs") long startTimeMs) {
+        this.startTimeMs = startTimeMs;
+    }
+
+    @JsonProperty
+    public long startTimeMs() {
+        return startTimeMs;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        CoordinatorStatusResponse that = (CoordinatorStatusResponse) o;
+        return Objects.equals(startTimeMs, that.startTimeMs);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(startTimeMs);
+    }
+
+    @Override
+    public String toString() {
+        return JsonUtil.toJsonString(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java
new file mode 100644
index 0000000..6e772d9
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.fault.FaultSpec;
+
+import java.util.Objects;
+
+/**
+ * A request to the Trogdor agent to create a fault.
+ */
+public class CreateAgentFaultRequest {
+    private final String id;
+    private final FaultSpec spec;
+
+    @JsonCreator
+    public CreateAgentFaultRequest(@JsonProperty("id") String id,
+            @JsonProperty("spec") FaultSpec spec) {
+        this.id = id;
+        this.spec = spec;
+    }
+
+    @JsonProperty
+    public String id() {
+        return id;
+    }
+
+    @JsonProperty
+    public FaultSpec spec() {
+        return spec;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        CreateAgentFaultRequest that = (CreateAgentFaultRequest) o;
+        return Objects.equals(id, that.id) &&
+               Objects.equals(spec, that.spec);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, spec);
+    }
+
+    @Override
+    public String toString() {
+        return JsonUtil.toJsonString(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java
new file mode 100644
index 0000000..ec00cf3
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.fault.FaultSpec;
+
+import java.util.Objects;
+
+/**
+ * A request to the Trogdor coordinator to create a fault.
+ */
+public class CreateCoordinatorFaultRequest {
+    private final String id;
+    private final FaultSpec spec;
+
+    @JsonCreator
+    public CreateCoordinatorFaultRequest(@JsonProperty("id") String id,
+            @JsonProperty("spec") FaultSpec spec) {
+        this.id = id;
+        this.spec = spec;
+    }
+
+    @JsonProperty
+    public String id() {
+        return id;
+    }
+
+    @JsonProperty
+    public FaultSpec spec() {
+        return spec;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        CreateCoordinatorFaultRequest that = (CreateCoordinatorFaultRequest) o;
+        return Objects.equals(id, that.id) &&
+               Objects.equals(spec, that.spec);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, spec);
+    }
+
+    @Override
+    public String toString() {
+        return JsonUtil.toJsonString(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/Empty.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/Empty.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/Empty.java
new file mode 100644
index 0000000..da2fcba
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/Empty.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.kafka.trogdor.common.JsonUtil;
+
+/**
+ * An empty request or response.
+ */
+public class Empty {
+    public static final Empty INSTANCE = new Empty();
+
+    @JsonCreator
+    public Empty() {
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return 1;
+    }
+
+    @Override
+    public String toString() {
+        return JsonUtil.toJsonString(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/ErrorResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/ErrorResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/ErrorResponse.java
new file mode 100644
index 0000000..08bf6cd
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/ErrorResponse.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.common.JsonUtil;
+
+import java.util.Objects;
+
+/**
+ * An error response.
+ */
+public class ErrorResponse {
+    private final int code;
+    private final String message;
+
+    @JsonCreator
+    public ErrorResponse(@JsonProperty("code") int code,
+                         @JsonProperty("message") String message) {
+        this.code = code;
+        this.message = message;
+    }
+
+    @JsonProperty
+    public int code() {
+        return code;
+    }
+
+    @JsonProperty
+    public String message() {
+        return message;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ErrorResponse that = (ErrorResponse) o;
+        return Objects.equals(code, that.code) &&
+            Objects.equals(message, that.message);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(code, message);
+    }
+
+    @Override
+    public String toString() {
+        return JsonUtil.toJsonString(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java
new file mode 100644
index 0000000..773d519
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.fault.FaultSpec;
+import org.apache.kafka.trogdor.fault.FaultState;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Response to GET /faults
+ */
+public class FaultDataMap {
+    private final Map<String, FaultData> faults;
+
+    public static class FaultData  {
+        private final FaultSpec spec;
+        private final FaultState state;
+
+        @JsonCreator
+        public FaultData(@JsonProperty("spec") FaultSpec spec,
+                @JsonProperty("status") FaultState state) {
+            this.spec = spec;
+            this.state = state;
+        }
+
+        @JsonProperty
+        public FaultSpec spec() {
+            return spec;
+        }
+
+        @JsonProperty
+        public FaultState state() {
+            return state;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            FaultData that = (FaultData) o;
+            return Objects.equals(spec, that.spec) &&
+                Objects.equals(state, that.state);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(spec, state);
+        }
+    }
+
+    @JsonCreator
+    public FaultDataMap(@JsonProperty("faults") Map<String, FaultData> faults) {
+        this.faults = faults;
+    }
+
+    @JsonProperty
+    public Map<String, FaultData> faults() {
+        return faults;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        FaultDataMap that = (FaultDataMap) o;
+        return Objects.equals(faults, that.faults);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(faults);
+    }
+
+    @Override
+    public String toString() {
+        return JsonUtil.toJsonString(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
new file mode 100644
index 0000000..1b23a9e
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.Slf4jRequestLog;
+import org.eclipse.jetty.server.handler.DefaultHandler;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.server.handler.RequestLogHandler;
+import org.eclipse.jetty.server.handler.StatisticsHandler;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.servlet.ServletContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Embedded server for the REST API that provides the control plane for Trogdor.
+ */
+public class JsonRestServer {
+    private static final Logger log = LoggerFactory.getLogger(JsonRestServer.class);
+
+    private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 2 * 1000;
+
+    private final Server jettyServer;
+
+    private final ServerConnector connector;
+
+    /**
+     * Create a REST server for this herder using the specified configs.
+     *
+     * @param port              The port number to use for the REST server, or
+     *                          0 to use a random port.
+     */
+    public JsonRestServer(int port) {
+        this.jettyServer = new Server();
+        this.connector = new ServerConnector(jettyServer);
+        if (port > 0) {
+            connector.setPort(port);
+        }
+        jettyServer.setConnectors(new Connector[]{connector});
+    }
+
+    /**
+     * Start the JsonRestServer.
+     *
+     * @param resources         The path handling resources to register.
+     */
+    public void start(Object... resources) {
+        log.info("Starting REST server");
+
+        ResourceConfig resourceConfig = new ResourceConfig();
+        resourceConfig.register(new JacksonJsonProvider(JsonUtil.JSON_SERDE));
+        for (Object resource : resources) {
+            resourceConfig.register(resource);
+            log.info("Registered resource {}", resource);
+        }
+        resourceConfig.register(RestExceptionMapper.class);
+        ServletContainer servletContainer = new ServletContainer(resourceConfig);
+        ServletHolder servletHolder = new ServletHolder(servletContainer);
+        ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+        context.setContextPath("/");
+        context.addServlet(servletHolder, "/*");
+
+        RequestLogHandler requestLogHandler = new RequestLogHandler();
+        Slf4jRequestLog requestLog = new Slf4jRequestLog();
+        requestLog.setLoggerName(JsonRestServer.class.getCanonicalName());
+        requestLog.setLogLatency(true);
+        requestLogHandler.setRequestLog(requestLog);
+
+        HandlerCollection handlers = new HandlerCollection();
+        handlers.setHandlers(new Handler[]{context, new DefaultHandler(), requestLogHandler});
+        StatisticsHandler statsHandler = new StatisticsHandler();
+        statsHandler.setHandler(handlers);
+        jettyServer.setHandler(statsHandler);
+        /* Needed for graceful shutdown as per `setStopTimeout` documentation */
+        jettyServer.setStopTimeout(GRACEFUL_SHUTDOWN_TIMEOUT_MS);
+        jettyServer.setStopAtShutdown(true);
+
+        try {
+            jettyServer.start();
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to start REST server", e);
+        }
+        log.info("REST server listening at " + jettyServer.getURI());
+    }
+
+    public int port() {
+        return connector.getLocalPort();
+    }
+
+    public void stop() {
+        log.info("Stopping REST server");
+
+        try {
+            jettyServer.stop();
+            jettyServer.join();
+            log.info("REST server stopped");
+        } catch (Exception e) {
+            log.error("Unable to stop REST server", e);
+        } finally {
+            jettyServer.destroy();
+        }
+    }
+
+    /**
+     * @param url               HTTP connection will be established with this url.
+     * @param method            HTTP method ("GET", "POST", "PUT", etc.)
+     * @param requestBodyData   Object to serialize as JSON and send in the request body.
+     * @param responseFormat    Expected format of the response to the HTTP request.
+     * @param <T>               The type of the deserialized response to the HTTP request.
+     * @return The deserialized response to the HTTP request, or null if no data is expected.
+     */
+    public static <T> HttpResponse<T> httpRequest(String url, String method, Object requestBodyData,
+                                    TypeReference<T> responseFormat) throws IOException {
+        HttpURLConnection connection = null;
+        try {
+            String serializedBody = requestBodyData == null ? null :
+                JsonUtil.JSON_SERDE.writeValueAsString(requestBodyData);
+            log.debug("Sending {} with input {} to {}", method, serializedBody, url);
+            connection = (HttpURLConnection) new URL(url).openConnection();
+            connection.setRequestMethod(method);
+            connection.setRequestProperty("User-Agent", "kafka");
+            connection.setRequestProperty("Accept", "application/json");
+
+            // connection.getResponseCode() implicitly calls getInputStream, so always set
+            // this to true.
+            connection.setDoInput(true);
+
+            connection.setUseCaches(false);
+
+            if (requestBodyData != null) {
+                connection.setRequestProperty("Content-Type", "application/json");
+                connection.setDoOutput(true);
+
+                OutputStream os = connection.getOutputStream();
+                os.write(serializedBody.getBytes(StandardCharsets.UTF_8));
+                os.flush();
+                os.close();
+            }
+
+            int responseCode = connection.getResponseCode();
+            if (responseCode == HttpURLConnection.HTTP_NO_CONTENT) {
+                return new HttpResponse<>(null, new ErrorResponse(responseCode, connection.getResponseMessage()));
+            } else if ((responseCode >= 200) && (responseCode < 300)) {
+                InputStream is = connection.getInputStream();
+                T result = JsonUtil.JSON_SERDE.readValue(is, responseFormat);
+                is.close();
+                return new HttpResponse<>(result, null);
+            } else {
+                // If the resposne code was not in the 200s, we assume that this is an error
+                // response.
+                InputStream es = connection.getErrorStream();
+                if (es == null) {
+                    // Handle the case where HttpURLConnection#getErrorStream returns null.
+                    return new HttpResponse<>(null, new ErrorResponse(responseCode, ""));
+                }
+                // Try to read the error response JSON.
+                ErrorResponse error = JsonUtil.JSON_SERDE.readValue(es, ErrorResponse.class);
+                es.close();
+                return new HttpResponse<>(null, error);
+            }
+        } finally {
+            if (connection != null) {
+                connection.disconnect();
+            }
+        }
+    }
+
+    public static class HttpResponse<T> {
+        private final T body;
+        private final ErrorResponse error;
+
+        HttpResponse(T body, ErrorResponse error) {
+            this.body = body;
+            this.error = error;
+        }
+
+        public T body() throws Exception {
+            if (error != null) {
+                throw RestExceptionMapper.toException(error.code(), error.message());
+            }
+            return body;
+        }
+
+        public ErrorResponse error() {
+            return error;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java
new file mode 100644
index 0000000..b063a9a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+
+public class RestExceptionMapper implements ExceptionMapper<Throwable> {
+    private static final Logger log = LoggerFactory.getLogger(RestExceptionMapper.class);
+
+    @Override
+    public Response toResponse(Throwable e) {
+        if (log.isDebugEnabled()) {
+            log.debug("Uncaught exception in REST call: ", e);
+        } else if (log.isInfoEnabled()) {
+            log.info("Uncaught exception in REST call: {}", e.getMessage());
+        }
+        if (e instanceof NotFoundException) {
+            return buildResponse(Response.Status.NOT_FOUND, e);
+        } else if (e instanceof JsonMappingException) {
+            return buildResponse(Response.Status.BAD_REQUEST, e);
+        } else if (e instanceof ClassNotFoundException) {
+            return buildResponse(Response.Status.NOT_IMPLEMENTED, e);
+        } else if (e instanceof InvalidTypeIdException) {
+            return buildResponse(Response.Status.NOT_IMPLEMENTED, e);
+        } else if (e instanceof SerializationException) {
+            return buildResponse(Response.Status.BAD_REQUEST, e);
+        } else {
+            return buildResponse(Response.Status.INTERNAL_SERVER_ERROR, e);
+        }
+    }
+
+    public static Exception toException(int code, String msg) throws Exception {
+        if (code == Response.Status.NOT_FOUND.getStatusCode()) {
+            throw new NotFoundException(msg);
+        } else if (code == Response.Status.NOT_IMPLEMENTED.getStatusCode()) {
+            throw new ClassNotFoundException(msg);
+        } else if (code == Response.Status.BAD_REQUEST.getStatusCode()) {
+            throw new SerializationException(msg);
+        } else {
+            throw new RuntimeException(msg);
+        }
+    }
+
+    private Response buildResponse(Response.Status code, Throwable e) {
+        return Response.status(code).
+            entity(new ErrorResponse(code.getStatusCode(), e.getMessage())).
+            build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
new file mode 100644
index 0000000..c587e44
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.agent;
+
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.trogdor.basic.BasicNode;
+import org.apache.kafka.trogdor.basic.BasicPlatform;
+import org.apache.kafka.trogdor.basic.BasicTopology;
+import org.apache.kafka.trogdor.common.ExpectedFaults;
+import org.apache.kafka.trogdor.common.Node;
+import org.apache.kafka.trogdor.fault.FaultState;
+import org.apache.kafka.trogdor.fault.NoOpFaultSpec;
+import org.apache.kafka.trogdor.rest.AgentFaultsResponse;
+import org.apache.kafka.trogdor.rest.AgentStatusResponse;
+import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
+
+import org.apache.kafka.trogdor.rest.JsonRestServer;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.TreeMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class AgentTest {
+    @Rule
+    final public Timeout globalTimeout = Timeout.millis(120000);
+
+    private static BasicPlatform createBasicPlatform() {
+        TreeMap<String, Node> nodes = new TreeMap<>();
+        HashMap<String, String> config = new HashMap<>();
+        nodes.put("node01", new BasicNode("node01", "localhost",
+            config, Collections.<String>emptySet()));
+        BasicTopology topology = new BasicTopology(nodes);
+        return new BasicPlatform("node01", topology, new BasicPlatform.ShellCommandRunner());
+    }
+
+    private Agent createAgent(Time time) {
+        JsonRestServer restServer = new JsonRestServer(0);
+        AgentRestResource resource = new AgentRestResource();
+        restServer.start(resource);
+        return new Agent(createBasicPlatform(), time, restServer, resource);
+    }
+
+    @Test
+    public void testAgentStartShutdown() throws Exception {
+        Agent agent = createAgent(Time.SYSTEM);
+        agent.beginShutdown();
+        agent.waitForShutdown();
+    }
+
+    @Test
+    public void testAgentProgrammaticShutdown() throws Exception {
+        Agent agent = createAgent(Time.SYSTEM);
+        AgentClient client = new AgentClient("localhost", agent.port());
+        client.invokeShutdown();
+        agent.waitForShutdown();
+    }
+
+    @Test
+    public void testAgentGetStatus() throws Exception {
+        Agent agent = createAgent(Time.SYSTEM);
+        AgentClient client = new AgentClient("localhost", agent.port());
+        AgentStatusResponse status = client.getStatus();
+        assertEquals(agent.startTimeMs(), status.startTimeMs());
+        agent.beginShutdown();
+        agent.waitForShutdown();
+    }
+
+    @Test
+    public void testAgentCreateFaults() throws Exception {
+        Time time = new MockTime(0, 0, 0);
+        Agent agent = createAgent(time);
+        AgentClient client = new AgentClient("localhost", agent.port());
+        AgentFaultsResponse faults = client.getFaults();
+        assertEquals(Collections.emptyMap(), faults.faults());
+        new ExpectedFaults().waitFor(client);
+
+        final NoOpFaultSpec fooSpec = new NoOpFaultSpec(1000, 600000);
+        client.putFault(new CreateAgentFaultRequest("foo", fooSpec));
+        new ExpectedFaults().addFault("foo", fooSpec).waitFor(client);
+
+        final NoOpFaultSpec barSpec = new NoOpFaultSpec(2000, 900000);
+        client.putFault(new CreateAgentFaultRequest("bar", barSpec));
+        new ExpectedFaults().
+            addFault("foo", fooSpec).
+            addFault("bar", barSpec).
+            waitFor(client);
+
+        final NoOpFaultSpec bazSpec = new NoOpFaultSpec(1, 450000);
+        client.putFault(new CreateAgentFaultRequest("baz", bazSpec));
+        new ExpectedFaults().
+            addFault("foo", fooSpec).
+            addFault("bar", barSpec).
+            addFault("baz", bazSpec).
+            waitFor(client);
+
+        agent.beginShutdown();
+        agent.waitForShutdown();
+    }
+
+    @Test
+    public void testAgentActivatesFaults() throws Exception {
+        Time time = new MockTime(0, 0, 0);
+        Agent agent = createAgent(time);
+        AgentClient client = new AgentClient("localhost", agent.port());
+        AgentFaultsResponse faults = client.getFaults();
+        assertEquals(Collections.emptyMap(), faults.faults());
+        new ExpectedFaults().waitFor(client);
+
+        final NoOpFaultSpec fooSpec = new NoOpFaultSpec(10, 2);
+        client.putFault(new CreateAgentFaultRequest("foo", fooSpec));
+        new ExpectedFaults().addFault("foo", FaultState.RUNNING).waitFor(client);
+
+        final NoOpFaultSpec barSpec = new NoOpFaultSpec(20, 3);
+        client.putFault(new CreateAgentFaultRequest("bar", barSpec));
+        time.sleep(11);
+        new ExpectedFaults().
+            addFault("foo", FaultState.RUNNING).
+            addFault("bar", FaultState.RUNNING).
+            waitFor(client);
+
+        final NoOpFaultSpec bazSpec = new NoOpFaultSpec(1, 11);
+        client.putFault(new CreateAgentFaultRequest("baz", bazSpec));
+        new ExpectedFaults().
+            addFault("foo", FaultState.RUNNING).
+            addFault("bar", FaultState.RUNNING).
+            addFault("baz", FaultState.RUNNING).
+            waitFor(client);
+
+        time.sleep(2);
+        new ExpectedFaults().
+            addFault("foo", FaultState.DONE).
+            addFault("bar", FaultState.RUNNING).
+            addFault("baz", FaultState.DONE).
+            waitFor(client);
+
+        time.sleep(100);
+        new ExpectedFaults().
+            addFault("foo", FaultState.DONE).
+            addFault("bar", FaultState.DONE).
+            addFault("baz", FaultState.DONE).
+            waitFor(client);
+
+        agent.beginShutdown();
+        agent.waitForShutdown();
+    }
+};

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/test/java/org/apache/kafka/trogdor/basic/BasicPlatformTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/basic/BasicPlatformTest.java b/tools/src/test/java/org/apache/kafka/trogdor/basic/BasicPlatformTest.java
new file mode 100644
index 0000000..c0dd680
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/basic/BasicPlatformTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.basic;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.trogdor.common.Platform;
+
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+
+public class BasicPlatformTest {
+    @Rule
+    final public Timeout globalTimeout = Timeout.millis(120000);
+
+    @Test
+    public void testCreateBasicPlatform() throws Exception {
+        File configFile = TestUtils.tempFile();
+        try {
+            try (OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(configFile),
+                    StandardCharsets.UTF_8)) {
+                writer.write("{\n");
+                writer.write("  \"platform\": \"org.apache.kafka.trogdor.basic.BasicPlatform\",\n");
+                writer.write("  \"nodes\": {\n");
+                writer.write("    \"bob01\": {\n");
+                writer.write("      \"hostname\": \"localhost\",\n");
+                writer.write("      \"trogdor.agent.port\": 8888\n");
+                writer.write("    },\n");
+                writer.write("    \"bob02\": {\n");
+                writer.write("      \"hostname\": \"localhost\",\n");
+                writer.write("      \"trogdor.agent.port\": 8889\n");
+                writer.write("    }\n");
+                writer.write("  }\n");
+                writer.write("}\n");
+            }
+            Platform platform = Platform.Config.parse("bob01", configFile.getPath());
+            assertEquals("BasicPlatform", platform.name());
+            assertEquals(2, platform.topology().nodes().size());
+            assertEquals("bob01, bob02", Utils.join(platform.topology().nodes().keySet(), ", "));
+        } finally {
+            Files.delete(configFile.toPath());
+        }
+    }
+};

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/test/java/org/apache/kafka/trogdor/common/CapturingCommandRunner.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/CapturingCommandRunner.java b/tools/src/test/java/org/apache/kafka/trogdor/common/CapturingCommandRunner.java
new file mode 100644
index 0000000..2e5a660
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/CapturingCommandRunner.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.common;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.trogdor.basic.BasicPlatform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class CapturingCommandRunner implements BasicPlatform.CommandRunner {
+    private static final Logger log = LoggerFactory.getLogger(CapturingCommandRunner.class);
+
+    private final Map<String, List<String>> commands = new HashMap<>();
+
+    private synchronized List<String> getOrCreate(String nodeName) {
+        List<String> lines = commands.get(nodeName);
+        if (lines != null) {
+            return lines;
+        }
+        lines = new LinkedList<>();
+        commands.put(nodeName, lines);
+        return lines;
+    }
+
+    @Override
+    public String run(Node curNode, String[] command) throws IOException {
+        String line = Utils.join(command, " ");
+        synchronized (this) {
+            getOrCreate(curNode.name()).add(line);
+        }
+        log.debug("RAN {}: {}", curNode, Utils.join(command, " "));
+        return "";
+    }
+
+    public synchronized List<String> lines(String nodeName) {
+        return new ArrayList<String>(getOrCreate(nodeName));
+    }
+};

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedFaults.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedFaults.java b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedFaults.java
new file mode 100644
index 0000000..1fab903
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedFaults.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.common;
+
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.trogdor.agent.AgentClient;
+import org.apache.kafka.trogdor.coordinator.CoordinatorClient;
+import org.apache.kafka.trogdor.fault.FaultSpec;
+import org.apache.kafka.trogdor.fault.FaultState;
+import org.apache.kafka.trogdor.rest.AgentFaultsResponse;
+import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+public class ExpectedFaults {
+    private static final Logger log = LoggerFactory.getLogger(ExpectedFaults.class);
+
+    private static class FaultData {
+        final FaultSpec spec;
+        final FaultState state;
+
+        FaultData(FaultSpec spec, FaultState state) {
+            this.spec = spec;
+            this.state = state;
+        }
+    }
+
+    private interface FaultFetcher {
+        TreeMap<String, FaultData> fetch() throws Exception;
+    }
+
+    private static class AgentFaultFetcher implements FaultFetcher {
+        private final AgentClient client;
+
+        AgentFaultFetcher(AgentClient client) {
+            this.client = client;
+        }
+
+        @Override
+        public TreeMap<String, FaultData> fetch() throws Exception {
+            TreeMap<String, FaultData> results = new TreeMap<>();
+            AgentFaultsResponse response = client.getFaults();
+            for (Map.Entry<String, AgentFaultsResponse.FaultData> entry :
+                    response.faults().entrySet()) {
+                results.put(entry.getKey(),
+                    new FaultData(entry.getValue().spec(), entry.getValue().state()));
+            }
+            return results;
+        }
+    }
+
+    private static class CoordinatorFaultFetcher implements FaultFetcher {
+        private final CoordinatorClient client;
+
+        CoordinatorFaultFetcher(CoordinatorClient client) {
+            this.client = client;
+        }
+
+        @Override
+        public TreeMap<String, FaultData> fetch() throws Exception {
+            TreeMap<String, FaultData> results = new TreeMap<>();
+            CoordinatorFaultsResponse response = client.getFaults();
+            for (Map.Entry<String, CoordinatorFaultsResponse.FaultData> entry :
+                response.faults().entrySet()) {
+                results.put(entry.getKey(),
+                    new FaultData(entry.getValue().spec(), entry.getValue().state()));
+            }
+            return results;
+        }
+    }
+
+    private final TreeMap<String, FaultData> expected = new TreeMap<String, FaultData>();
+
+    public ExpectedFaults addFault(String id, FaultSpec spec) {
+        expected.put(id, new FaultData(spec, null));
+        return this;
+    }
+
+    public ExpectedFaults addFault(String id, FaultState state) {
+        expected.put(id, new FaultData(null, state));
+        return this;
+    }
+
+    public ExpectedFaults addFault(String id, FaultSpec spec, FaultState state) {
+        expected.put(id, new FaultData(spec, state));
+        return this;
+    }
+
+    public ExpectedFaults waitFor(AgentClient agentClient) throws InterruptedException {
+        waitFor(new AgentFaultFetcher(agentClient));
+        return this;
+    }
+
+    public ExpectedFaults waitFor(CoordinatorClient client) throws InterruptedException {
+        waitFor(new CoordinatorFaultFetcher(client));
+        return this;
+    }
+
+    private void waitFor(final FaultFetcher faultFetcher) throws InterruptedException {
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                TreeMap<String, FaultData> curData = null;
+                try {
+                    curData = faultFetcher.fetch();
+                } catch (Exception e) {
+                    log.info("Got error fetching faults", e);
+                    throw new RuntimeException(e);
+                }
+                StringBuilder errors = new StringBuilder();
+                for (Map.Entry<String, FaultData> entry : expected.entrySet()) {
+                    String id = entry.getKey();
+                    FaultData expectedFaultData = entry.getValue();
+                    FaultData curFaultData = curData.get(id);
+                    if (curFaultData == null) {
+                        errors.append("Did not find fault id " + id + "\n");
+                    } else {
+                        if (expectedFaultData.spec != null) {
+                            if (!expectedFaultData.spec.equals(curFaultData.spec)) {
+                                errors.append("For fault id " + id + ", expected fault " +
+                                    "spec " + expectedFaultData.spec + ", but got " +
+                                    curFaultData.spec + "\n");
+                            }
+                        }
+                        if (expectedFaultData.state != null) {
+                            if (!expectedFaultData.state.equals(curFaultData.state)) {
+                                errors.append("For fault id " + id + ", expected fault " +
+                                    "state " + expectedFaultData.state + ", but got " +
+                                    curFaultData.state + "\n");
+                            }
+                        }
+                    }
+                }
+                for (String id : curData.keySet()) {
+                    if (expected.get(id) == null) {
+                        errors.append("Got unexpected fault id " + id + "\n");
+                    }
+                }
+                String errorString = errors.toString();
+                if (!errorString.isEmpty()) {
+                    log.info("EXPECTED FAULTS: {}", faultsToString(expected));
+                    log.info("ACTUAL FAULTS  : {}", faultsToString(curData));
+                    log.info(errorString);
+                    return false;
+                }
+                return true;
+            }
+        }, "Timed out waiting for expected fault specs " + faultsToString(expected));
+    }
+
+    private static String faultsToString(TreeMap<String, FaultData> faults) {
+        StringBuilder bld = new StringBuilder();
+        bld.append("{");
+        String faultsPrefix = "";
+        for (Map.Entry<String, FaultData> entry : faults.entrySet()) {
+            String id = entry.getKey();
+            bld.append(faultsPrefix).append(id).append(": {");
+            faultsPrefix = ", ";
+            String faultValuesPrefix = "";
+            FaultData faultData = entry.getValue();
+            if (faultData.spec != null) {
+                bld.append(faultValuesPrefix).append("spec: ").append(faultData.spec);
+                faultValuesPrefix = ", ";
+            }
+            if (faultData.state != null) {
+                bld.append(faultValuesPrefix).append("state: ").append(faultData.state);
+                faultValuesPrefix = ", ";
+            }
+            bld.append("}");
+        }
+        bld.append("}");
+        return bld.toString();
+    }
+};


Mime
View raw message