kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: MINOR: Avoid nulls when deserializing Trogodor JSON (#4688)
Date Thu, 15 Mar 2018 11:44:30 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8c10e06  MINOR: Avoid nulls when deserializing Trogodor JSON (#4688)
8c10e06 is described below

commit 8c10e0600736e8620f868ae15516d1fc06f775a2
Author: Colin Patrick McCabe <colin@cmccabe.xyz>
AuthorDate: Thu Mar 15 04:44:27 2018 -0700

    MINOR: Avoid nulls when deserializing Trogodor JSON (#4688)
---
 .../org/apache/kafka/trogdor/common/JsonUtil.java  |  2 +
 .../trogdor/fault/FilesUnreadableFaultSpec.java    |  7 ++-
 .../org/apache/kafka/trogdor/fault/Kibosh.java     |  2 +-
 .../trogdor/fault/NetworkPartitionFaultSpec.java   |  2 +-
 .../kafka/trogdor/fault/ProcessStopFaultSpec.java  |  4 +-
 .../kafka/trogdor/rest/AgentStatusResponse.java    |  2 +-
 .../apache/kafka/trogdor/rest/TasksResponse.java   |  9 ++-
 .../org/apache/kafka/trogdor/rest/WorkerDone.java  |  4 +-
 .../apache/kafka/trogdor/rest/WorkerRunning.java   |  2 +-
 .../apache/kafka/trogdor/rest/WorkerStopping.java  |  2 +-
 .../kafka/trogdor/workload/ProduceBenchSpec.java   |  7 ++-
 .../trogdor/workload/RoundTripWorkloadSpec.java    |  8 ++-
 .../trogdor/common/JsonSerializationTest.java      | 69 ++++++++++++++++++++++
 .../apache/kafka/trogdor/task/SampleTaskSpec.java  |  2 +-
 14 files changed, 100 insertions(+), 22 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java b/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java
index 9ec737e..70193c3 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.trogdor.common;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -32,6 +33,7 @@ public class JsonUtil {
         JSON_SERDE = new ObjectMapper();
         JSON_SERDE.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
         JSON_SERDE.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
+        JSON_SERDE.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
     }
 
     public static String toJsonString(Object object) {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FilesUnreadableFaultSpec.java
b/tools/src/main/java/org/apache/kafka/trogdor/fault/FilesUnreadableFaultSpec.java
index 1fbf9d0..cb520c4 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/FilesUnreadableFaultSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/FilesUnreadableFaultSpec.java
@@ -24,6 +24,7 @@ import org.apache.kafka.trogdor.task.TaskController;
 import org.apache.kafka.trogdor.task.TaskSpec;
 import org.apache.kafka.trogdor.task.TaskWorker;
 
+import java.util.HashSet;
 import java.util.Set;
 
 /**
@@ -43,9 +44,9 @@ public class FilesUnreadableFaultSpec extends TaskSpec {
                                     @JsonProperty("prefix") String prefix,
                                     @JsonProperty("errorCode") int errorCode) {
         super(startMs, durationMs);
-        this.nodeNames = nodeNames;
-        this.mountPath = mountPath;
-        this.prefix = prefix;
+        this.nodeNames = nodeNames == null ? new HashSet<String>() : nodeNames;
+        this.mountPath = mountPath == null ? "" : mountPath;
+        this.prefix = prefix == null ? "" : prefix;
         this.errorCode = errorCode;
     }
 
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java
index 6fa1a4b..d34acf0 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java
@@ -133,7 +133,7 @@ public final class Kibosh {
 
         @JsonCreator
         public KiboshControlFile(@JsonProperty("faults") List<KiboshFaultSpec> faults)
{
-            this.faults = faults;
+            this.faults = faults == null ? new ArrayList<KiboshFaultSpec>() : faults;
         }
 
         @JsonProperty
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java
b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java
index 7b9ccc4..c3df792 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java
@@ -39,7 +39,7 @@ public class NetworkPartitionFaultSpec extends TaskSpec {
                          @JsonProperty("durationMs") long durationMs,
                          @JsonProperty("partitions") List<List<String>> partitions)
{
         super(startMs, durationMs);
-        this.partitions = partitions;
+        this.partitions = partitions == null ? new ArrayList<List<String>>()
: partitions;
     }
 
     @JsonProperty
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultSpec.java
b/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultSpec.java
index 505baa9..cda2fb0 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultSpec.java
@@ -40,8 +40,8 @@ public class ProcessStopFaultSpec extends TaskSpec {
                         @JsonProperty("nodeNames") List<String> nodeNames,
                         @JsonProperty("javaProcessName") String javaProcessName) {
         super(startMs, durationMs);
-        this.nodeNames = new HashSet<>(nodeNames);
-        this.javaProcessName = javaProcessName;
+        this.nodeNames = nodeNames == null ? new HashSet<String>() : new HashSet<>(nodeNames);
+        this.javaProcessName = javaProcessName == null ? "" : javaProcessName;
     }
 
     @JsonProperty
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
index 77b4bfb..c505e75 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
@@ -33,7 +33,7 @@ public class AgentStatusResponse extends Message {
     public AgentStatusResponse(@JsonProperty("serverStartMs") long serverStartMs,
             @JsonProperty("workers") TreeMap<String, WorkerState> workers) {
         this.serverStartMs = serverStartMs;
-        this.workers = workers;
+        this.workers = workers == null ? new TreeMap<String, WorkerState>() : workers;
     }
 
     @JsonProperty
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksResponse.java
index d3b415b..5a3149c 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksResponse.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksResponse.java
@@ -20,21 +20,24 @@ package org.apache.kafka.trogdor.rest;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.TreeMap;
 
 /**
  * The response to /coordinator/tasks
  */
 public class TasksResponse extends Message {
-    private final TreeMap<String, TaskState> tasks;
+    private final Map<String, TaskState> tasks;
 
     @JsonCreator
     public TasksResponse(@JsonProperty("tasks") TreeMap<String, TaskState> tasks) {
-        this.tasks = tasks;
+        this.tasks = Collections.unmodifiableMap((tasks == null) ?
+            new TreeMap<String, TaskState>() : tasks);
     }
 
     @JsonProperty
-    public TreeMap<String, TaskState> tasks() {
+    public Map<String, TaskState> tasks() {
         return tasks;
     }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
index 0f46b25..e463ffc 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
@@ -55,8 +55,8 @@ public class WorkerDone extends WorkerState {
         super(spec);
         this.startedMs = startedMs;
         this.doneMs = doneMs;
-        this.status = status;
-        this.error = error;
+        this.status = status == null ? "" : status;
+        this.error = error == null ? "" : error;
     }
 
     @JsonProperty
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
index 3d8323d..e3b8d19 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
@@ -42,7 +42,7 @@ public class WorkerRunning extends WorkerState {
             @JsonProperty("status") String status) {
         super(spec);
         this.startedMs = startedMs;
-        this.status = status;
+        this.status = status == null ? "" : status;
     }
 
     @JsonProperty
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
index fa2d546..777e511 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
@@ -42,7 +42,7 @@ public class WorkerStopping extends WorkerState {
             @JsonProperty("status") String status) {
         super(spec);
         this.startedMs = startedMs;
-        this.status = status;
+        this.status = status == null ? "" : status;
     }
 
     @JsonProperty
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index efb2d85..3e05a53 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -26,6 +26,7 @@ import org.apache.kafka.trogdor.task.TaskWorker;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.Set;
 
 /**
@@ -53,12 +54,12 @@ public class ProduceBenchSpec extends TaskSpec {
                          @JsonProperty("totalTopics") int totalTopics,
                          @JsonProperty("activeTopics") int activeTopics) {
         super(startMs, durationMs);
-        this.producerNode = producerNode;
-        this.bootstrapServers = bootstrapServers;
+        this.producerNode = (producerNode == null) ? "" : producerNode;
+        this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
         this.targetMessagesPerSec = targetMessagesPerSec;
         this.maxMessages = maxMessages;
         this.messageSize = (messageSize == 0) ? PayloadGenerator.DEFAULT_MESSAGE_SIZE : messageSize;
-        this.producerConf = producerConf;
+        this.producerConf = (producerConf == null) ? new TreeMap<String, String>()
: producerConf;
         this.totalTopics = totalTopics;
         this.activeTopics = activeTopics;
     }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
index 24c9e77..618c709 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
@@ -28,6 +28,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.Set;
+import java.util.TreeMap;
 
 /**
  * The specification for a workload that sends messages to a broker and then
@@ -49,10 +50,11 @@ public class RoundTripWorkloadSpec extends TaskSpec {
              @JsonProperty("partitionAssignments") NavigableMap<Integer, List<Integer>>
partitionAssignments,
              @JsonProperty("maxMessages") int maxMessages) {
         super(startMs, durationMs);
-        this.clientNode = clientNode;
-        this.bootstrapServers = bootstrapServers;
+        this.clientNode = clientNode == null ? "" : clientNode;
+        this.bootstrapServers = bootstrapServers == null ? "" : bootstrapServers;
         this.targetMessagesPerSec = targetMessagesPerSec;
-        this.partitionAssignments = partitionAssignments;
+        this.partitionAssignments = partitionAssignments == null ?
+            new TreeMap<Integer, List<Integer>>() : partitionAssignments;
         this.maxMessages = maxMessages;
     }
 
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
new file mode 100644
index 0000000..4e65d99
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.common;
+
+import org.apache.kafka.trogdor.fault.FilesUnreadableFaultSpec;
+import org.apache.kafka.trogdor.fault.Kibosh;
+import org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec;
+import org.apache.kafka.trogdor.fault.ProcessStopFaultSpec;
+import org.apache.kafka.trogdor.rest.AgentStatusResponse;
+import org.apache.kafka.trogdor.rest.TasksResponse;
+import org.apache.kafka.trogdor.rest.WorkerDone;
+import org.apache.kafka.trogdor.rest.WorkerRunning;
+import org.apache.kafka.trogdor.rest.WorkerStopping;
+import org.apache.kafka.trogdor.task.SampleTaskSpec;
+import org.apache.kafka.trogdor.workload.ProduceBenchSpec;
+import org.apache.kafka.trogdor.workload.RoundTripWorkloadSpec;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+
+import static org.junit.Assert.assertNotNull;
+
+public class JsonSerializationTest {
+    @Test
+    public void testDeserializationDoesNotProduceNulls() throws Exception {
+        verify(new FilesUnreadableFaultSpec(0, 0, null,
+            null, null, 0));
+        verify(new Kibosh.KiboshControlFile(null));
+        verify(new NetworkPartitionFaultSpec(0, 0, null));
+        verify(new ProcessStopFaultSpec(0, 0, null, null));
+        verify(new AgentStatusResponse(0, null));
+        verify(new TasksResponse(null));
+        verify(new WorkerDone(null, 0, 0, null, null));
+        verify(new WorkerRunning(null, 0, null));
+        verify(new WorkerStopping(null, 0, null));
+        verify(new ProduceBenchSpec(0, 0, null, null,
+            0, 0, 0, null, 0, 0));
+        verify(new RoundTripWorkloadSpec(0, 0, null, null,
+            0, null, 0));
+        verify(new SampleTaskSpec(0, 0, 0, null));
+    }
+
+    private <T> void verify(T val1) throws Exception {
+        byte[] bytes = JsonUtil.JSON_SERDE.writeValueAsBytes(val1);
+        Class<T> clazz = (Class<T>) val1.getClass();
+        T val2 = JsonUtil.JSON_SERDE.readValue(bytes, clazz);
+        for (Field field : clazz.getDeclaredFields()) {
+            boolean wasAccessible = field.isAccessible();
+            field.setAccessible(true);
+            assertNotNull("Field " + field + " was null.", field.get(val2));
+            field.setAccessible(wasAccessible);
+        }
+    }
+};
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java
index 2bbbb20..26fdfb2 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java
@@ -31,7 +31,7 @@ public class SampleTaskSpec extends TaskSpec {
                         @JsonProperty("error") String error) {
         super(startMs, durationMs);
         this.exitMs = exitMs;
-        this.error = error;
+        this.error = error == null ? "" : error;
     }
 
     @JsonProperty

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.

Mime
View raw message