kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5631; Use Jackson for serialising to JSON
Date Tue, 12 Dec 2017 16:34:41 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 651c6e480 -> 0a508a436


KAFKA-5631; Use Jackson for serialising to JSON

- Rename `encode` to `legacyEncodeAsString`, we
can remove this when we remove `ZkUtils`.
- Introduce `encodeAsString` that uses Jackson.
- Change `encodeAsBytes` to use Jackson.
- Avoid intermediate string when converting
Broker to json bytes.

The methods that use Jackson only support
Java collections unlike `legacyEncodeAsString`.

Tests were added `encodeAsString` and
`encodeAsBytes`.

Author: umesh chaudhary <umesh9794@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #4259 from umesh9794/KAFKA-5631


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0a508a43
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0a508a43
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0a508a43

Branch: refs/heads/trunk
Commit: 0a508a436c928c0440e8b90dda98c22dc0ec244c
Parents: 651c6e4
Author: umesh chaudhary <umesh9794@gmail.com>
Authored: Tue Dec 12 16:00:04 2017 +0200
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Dec 12 18:34:03 2017 +0200

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala |  6 +-
 .../main/scala/kafka/admin/LogDirsCommand.scala | 16 ++--
 .../kafka/admin/ReassignPartitionsCommand.scala | 13 ++-
 .../src/main/scala/kafka/api/LeaderAndIsr.scala |  4 +-
 core/src/main/scala/kafka/cluster/Broker.scala  | 14 ++--
 .../consumer/ZookeeperConsumerConnector.scala   | 10 ++-
 .../transaction/ProducerIdManager.scala         |  4 +-
 .../main/scala/kafka/security/auth/Acl.scala    |  3 +-
 .../scala/kafka/tools/DumpLogSegments.scala     | 16 ++--
 core/src/main/scala/kafka/utils/Json.scala      | 30 ++++---
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 17 ++--
 core/src/main/scala/kafka/zk/ZkData.scala       | 46 ++++++-----
 .../test/scala/unit/kafka/admin/AdminTest.scala |  4 +-
 .../kafka/consumer/PartitionAssignorTest.scala  |  6 +-
 .../unit/kafka/security/auth/AclTest.scala      |  5 +-
 .../kafka/server/DynamicConfigChangeTest.scala  |  9 +-
 .../test/scala/unit/kafka/utils/JsonTest.scala  | 87 +++++++++++++++-----
 .../unit/kafka/utils/ReplicationUtilsTest.scala |  5 +-
 .../scala/unit/kafka/utils/ZkUtilsTest.scala    |  2 +-
 gradle/dependencies.gradle                      |  2 +-
 20 files changed, 187 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 4d0ad58..f21b942 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -25,7 +25,7 @@ import java.util.Random
 import java.util.Properties
 
 import kafka.common.TopicAlreadyMarkedForDeletionException
-import org.apache.kafka.common.errors.{BrokerNotAvailableException, InvalidPartitionsException,
InvalidReplicaAssignmentException, InvalidReplicationFactorException, InvalidTopicException,
TopicExistsException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors._
 
 import collection.{Map, Set, mutable, _}
 import scala.collection.JavaConverters._
@@ -628,7 +628,7 @@ object AdminUtils extends Logging with AdminUtilities {
 
     // create the change notification
     val seqNode = ZkUtils.ConfigChangesPath + "/" + EntityConfigChangeZnodePrefix
-    val content = Json.encode(getConfigChangeZnodeData(sanitizedEntityPath))
+    val content = Json.legacyEncodeAsString(getConfigChangeZnodeData(sanitizedEntityPath))
     zkUtils.createSequentialPersistentPath(seqNode, content)
   }
 
@@ -641,7 +641,7 @@ object AdminUtils extends Logging with AdminUtilities {
    */
   private def writeEntityConfig(zkUtils: ZkUtils, entityPath: String, config: Properties)
{
     val map = Map("version" -> 1, "config" -> config.asScala)
-    zkUtils.updatePersistentPath(entityPath, Json.encode(map))
+    zkUtils.updatePersistentPath(entityPath, Json.legacyEncodeAsString(map))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/core/src/main/scala/kafka/admin/LogDirsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
index 6a167a2..d8e1beb 100644
--- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala
+++ b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
@@ -56,7 +56,7 @@ object LogDirsCommand {
     }
 
     private def formatAsJson(logDirInfosByBroker: Map[Integer, Map[String, LogDirInfo]],
topicSet: Set[String]): String = {
-        Json.encode(Map(
+        Json.encodeAsString(Map(
             "version" -> 1,
             "brokers" -> logDirInfosByBroker.map { case (broker, logDirInfos) =>
                 Map(
@@ -73,13 +73,13 @@ object LogDirsCommand {
                                     "size" -> replicaInfo.size,
                                     "offsetLag" -> replicaInfo.offsetLag,
                                     "isFuture" -> replicaInfo.isFuture
-                                )
-                            }
-                        )
-                    }
-                )
-            }
-        ))
+                                ).asJava
+                            }.asJava
+                        ).asJava
+                    }.asJava
+                ).asJava
+            }.asJava
+        ).asJava)
     }
 
     private def createAdminClient(opts: LogDirsCommandOptions): JAdminClient = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index e8aad7f..811e56e 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -43,7 +43,6 @@ object ReassignPartitionsCommand extends Logging {
   private[admin] val AnyLogDir = "any"
 
   def main(args: Array[String]): Unit = {
-
     val opts = validateAndParseArgs(args)
     val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
     val zkUtils = ZkUtils(zkConnect,
@@ -224,17 +223,17 @@ object ReassignPartitionsCommand extends Logging {
 
   def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]],
                                replicaLogDirAssignment: Map[TopicPartitionReplica, String]):
String = {
-    Json.encode(Map(
+    Json.encodeAsString(Map(
       "version" -> 1,
       "partitions" -> partitionsToBeReassigned.map { case (TopicAndPartition(topic, partition),
replicas) =>
         Map(
           "topic" -> topic,
           "partition" -> partition,
-          "replicas" -> replicas,
-          "log_dirs" -> replicas.map(r => replicaLogDirAssignment.getOrElse(new TopicPartitionReplica(topic,
partition, r), AnyLogDir))
-        )
-      }
-    ))
+          "replicas" -> replicas.asJava,
+          "log_dirs" -> replicas.map(r => replicaLogDirAssignment.getOrElse(new TopicPartitionReplica(topic,
partition, r), AnyLogDir)).asJava
+        ).asJava
+      }.asJava
+    ).asJava)
   }
 
   // Parses without deduplicating keys so the data can be checked before allowing reassignment
to proceed

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/core/src/main/scala/kafka/api/LeaderAndIsr.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
index 7a83cf3..cb59575 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
@@ -17,8 +17,6 @@
 
 package kafka.api
 
-import kafka.utils._
-
 object LeaderAndIsr {
   val initialLeaderEpoch: Int = 0
   val initialZKVersion: Int = 0
@@ -43,6 +41,6 @@ case class LeaderAndIsr(leader: Int,
   def newEpochAndZkVersion = newLeaderAndIsr(leader, isr)
 
   override def toString: String = {
-    Json.encode(Map("leader" -> leader, "leader_epoch" -> leaderEpoch, "isr" ->
isr))
+    s"LeaderAndIsr(leader=$leader, leaderEpoch=$leaderEpoch, isr=$isr, zkVersion=$zkVersion)"
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index a148dfd..df3be98 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -24,6 +24,9 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Time
 
+import scala.collection.Map
+import scala.collection.JavaConverters._
+
 /**
  * A Kafka broker.
  * A broker has an id and a collection of end-points.
@@ -127,12 +130,12 @@ object Broker {
     }
   }
 
-  def toJson(version: Int, id: Int, host: String, port: Int, advertisedEndpoints: Seq[EndPoint],
jmxPort: Int,
-             rack: Option[String]): String = {
+  def toJsonBytes(version: Int, id: Int, host: String, port: Int, advertisedEndpoints: Seq[EndPoint],
jmxPort: Int,
+                  rack: Option[String]): Array[Byte] = {
     val jsonMap = collection.mutable.Map(VersionKey -> version,
       HostKey -> host,
       PortKey -> port,
-      EndpointsKey -> advertisedEndpoints.map(_.connectionString).toArray,
+      EndpointsKey -> advertisedEndpoints.map(_.connectionString).toBuffer.asJava,
       JmxPortKey -> jmxPort,
       TimestampKey -> Time.SYSTEM.milliseconds().toString
     )
@@ -141,10 +144,9 @@ object Broker {
     if (version >= 4) {
       jsonMap += (ListenerSecurityProtocolMapKey -> advertisedEndpoints.map { endPoint
=>
         endPoint.listenerName.value -> endPoint.securityProtocol.name
-      }.toMap)
+      }.toMap.asJava)
     }
-
-    Json.encode(jsonMap)
+    Json.encodeAsBytes(jsonMap.asJava)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index bb5fc0f..759da4f 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -45,6 +45,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState
 import scala.collection._
 import scala.collection.JavaConverters._
 
+
 /**
  * This class handles the consumers interaction with zookeeper
  *
@@ -272,8 +273,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount:
TopicCount) {
     info("begin registering consumer " + consumerIdString + " in ZK")
     val timestamp = Time.SYSTEM.milliseconds.toString
-    val consumerRegistrationInfo = Json.encode(Map("version" -> 1, "subscription" ->
topicCount.getTopicCountMap, "pattern" -> topicCount.pattern,
-                                                  "timestamp" -> timestamp))
+
+    val consumerRegistrationInfo = Json.encodeAsString(Map("version" -> 1,
+      "subscription" -> topicCount.getTopicCountMap.asJava,
+      "pattern" -> topicCount.pattern,
+      "timestamp" -> timestamp
+    ).asJava)
+
     val zkWatchedEphemeral = new ZKCheckedEphemeral(dirs.
                                                     consumerRegistryDir + "/" + consumerIdString,
                                                     consumerRegistrationInfo,

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index 5d32085..716e3d1 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -22,6 +22,8 @@ import kafka.common.KafkaException
 import kafka.utils.{Json, Logging, ZkUtils}
 import kafka.zk.KafkaZkClient
 
+import scala.collection.JavaConverters._
+
 /**
  * ProducerIdManager is the part of the transaction coordinator that provides ProducerIds
in a unique way
  * such that the same producerId will not be assigned twice across multiple transaction coordinators.
@@ -37,7 +39,7 @@ object ProducerIdManager extends Logging {
     Json.encodeAsBytes(Map("version" -> CurrentVersion,
       "broker" -> producerIdBlock.brokerId,
       "block_start" -> producerIdBlock.blockStartId.toString,
-      "block_end" -> producerIdBlock.blockEndId.toString)
+      "block_end" -> producerIdBlock.blockEndId.toString).asJava
     )
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/core/src/main/scala/kafka/security/auth/Acl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala
index 4e2cba4..67f3d95 100644
--- a/core/src/main/scala/kafka/security/auth/Acl.scala
+++ b/core/src/main/scala/kafka/security/auth/Acl.scala
@@ -20,6 +20,7 @@ package kafka.security.auth
 import kafka.utils.Json
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.SecurityUtils
+import scala.collection.JavaConverters._
 
 object Acl {
   val WildCardPrincipal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*")
@@ -71,7 +72,7 @@ object Acl {
   }
 
   def toJsonCompatibleMap(acls: Set[Acl]): Map[String, Any] = {
-    Map(Acl.VersionKey -> Acl.CurrentVersion, Acl.AclsKey -> acls.map(acl => acl.toMap).toList)
+    Map(Acl.VersionKey -> Acl.CurrentVersion, Acl.AclsKey -> acls.map(acl => acl.toMap.asJava).toList.asJava)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index fe82dc2..127c570 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
 
-import scala.collection.mutable
+import scala.collection.{Map, mutable}
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConverters._
 
@@ -334,12 +334,14 @@ object DumpLogSegments {
         }
       }.mkString("{", ",", "}")
 
-      val keyString = Json.encode(Map("metadata" -> groupId))
-      val valueString = Json.encode(Map(
-          "protocolType" -> protocolType,
-          "protocol" -> group.protocol,
-          "generationId" -> group.generationId,
-          "assignment" -> assignment))
+      val keyString = Json.encodeAsString(Map("metadata" -> groupId).asJava)
+
+      val valueString = Json.encodeAsString(Map(
+        "protocolType" -> protocolType,
+        "protocol" -> group.protocol,
+        "generationId" -> group.generationId,
+        "assignment" -> assignment
+      ).asJava)
 
       (Some(keyString), Some(valueString))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/core/src/main/scala/kafka/utils/Json.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala
index c654d45..e8e7d8a 100644
--- a/core/src/main/scala/kafka/utils/Json.scala
+++ b/core/src/main/scala/kafka/utils/Json.scala
@@ -59,9 +59,11 @@ object Json {
    *   T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T]
    * Any other type will result in an exception.
    * 
-   * This method does not properly handle non-ascii characters. 
+   * This implementation is inefficient, so we recommend `encodeAsString` or `encodeAsBytes`
(the latter is preferred
+   * if possible). This method supports scala Map implementations while the other two do
not. Once this functionality
+   * is no longer required, we can remove this method.
    */
-  def encode(obj: Any): String = {
+  def legacyEncodeAsString(obj: Any): String = {
     obj match {
       case null => "null"
       case b: Boolean => b.toString
@@ -69,22 +71,26 @@ object Json {
       case n: Number => n.toString
       case m: Map[_, _] => "{" +
         m.map {
-          case (k, v) => encode(k) + ":" + encode(v)
+          case (k, v) => legacyEncodeAsString(k) + ":" + legacyEncodeAsString(v)
           case elem => throw new IllegalArgumentException(s"Invalid map element '$elem'
in $obj")
         }.mkString(",") + "}"
-      case a: Array[_] => encode(a.toSeq)
-      case i: Iterable[_] => "[" + i.map(encode).mkString(",") + "]"
+      case a: Array[_] => legacyEncodeAsString(a.toSeq)
+      case i: Iterable[_] => "[" + i.map(legacyEncodeAsString).mkString(",") + "]"
       case other: AnyRef => throw new IllegalArgumentException(s"Unknown argument of type
${other.getClass}: $other")
     }
   }
 
   /**
-   * Encode an object into a JSON value in bytes. This method accepts any type T where
-   *   T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T]
-   * Any other type will result in an exception.
-   *
-   * This method does not properly handle non-ascii characters.
-   */
-  def encodeAsBytes(obj: Any): Array[Byte] = encode(obj).getBytes(StandardCharsets.UTF_8)
+    * Encode an object into a JSON string. This method accepts any type supported by Jackson's
ObjectMapper in
+   * the default configuration. That is, Java collections are supported, but Scala collections
are not (to avoid
+   * a jackson-scala dependency).
+    */
+  def encodeAsString(obj: Any): String = mapper.writeValueAsString(obj)
 
+  /**
+   * Encode an object into a JSON value in bytes. This method accepts any type supported
by Jackson's ObjectMapper in
+   * the default configuration. That is, Java collections are supported, but Scala collections
are not (to avoid
+   * a jackson-scala dependency).
+   */
+  def encodeAsBytes(obj: Any): Array[Byte] = mapper.writeValueAsBytes(obj)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index b378280..2c079e5 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -17,6 +17,7 @@
 
 package kafka.utils
 
+import java.nio.charset.StandardCharsets
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import kafka.admin._
@@ -198,15 +199,15 @@ object ZkUtils {
   }
 
   def controllerZkData(brokerId: Int, timestamp: Long): String = {
-    Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString))
+    Json.legacyEncodeAsString(Map("version" -> 1, "brokerid" -> brokerId, "timestamp"
-> timestamp.toString))
   }
 
   def preferredReplicaLeaderElectionZkData(partitions: scala.collection.Set[TopicAndPartition]):
String = {
-    Json.encode(Map("version" -> 1, "partitions" -> partitions.map(tp => Map("topic"
-> tp.topic, "partition" -> tp.partition))))
+    Json.legacyEncodeAsString(Map("version" -> 1, "partitions" -> partitions.map(tp
=> Map("topic" -> tp.topic, "partition" -> tp.partition))))
   }
 
   def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]):
String = {
-    Json.encode(Map(
+    Json.legacyEncodeAsString(Map(
       "version" -> 1,
       "partitions" -> partitionsToBeReassigned.map { case (TopicAndPartition(topic, partition),
replicas) =>
         Map(
@@ -315,8 +316,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
   object ClusterId {
 
     def toJson(id: String) = {
-      val jsonMap = Map("version" -> "1", "id" -> id)
-      Json.encode(jsonMap)
+      Json.legacyEncodeAsString(Map("version" -> "1", "id" -> id))
     }
 
     def fromJson(clusterIdJson: String): String = {
@@ -457,7 +457,8 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
     val brokerIdPath = BrokerIdsPath + "/" + id
     // see method documentation for reason why we do this
     val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
-    val json = Broker.toJson(version, id, host, port, advertisedEndpoints, jmxPort, rack)
+    val json = new String(Broker.toJsonBytes(version, id, host, port, advertisedEndpoints,
jmxPort, rack),
+      StandardCharsets.UTF_8)
     registerBrokerInZk(brokerIdPath, json)
 
     info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(",")))
@@ -486,7 +487,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
   }
 
   def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = {
-    Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch"
-> leaderAndIsr.leaderEpoch,
+    Json.legacyEncodeAsString(Map("version" -> 1, "leader" -> leaderAndIsr.leader,
"leader_epoch" -> leaderAndIsr.leaderEpoch,
                     "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr))
   }
 
@@ -494,7 +495,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
    * Get JSON partition to replica map from zookeeper.
    */
   def replicaAssignmentZkData(map: Map[String, Seq[Int]]): String = {
-    Json.encode(Map("version" -> 1, "partitions" -> map))
+    Json.legacyEncodeAsString(Map("version" -> 1, "partitions" -> map))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/core/src/main/scala/kafka/zk/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 8bd32d0..2223001 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -27,13 +27,15 @@ import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
 import kafka.utils.Json
 import org.apache.kafka.common.TopicPartition
 import org.apache.zookeeper.data.Stat
+import scala.collection.JavaConverters._
 
 // This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes).
 
 object ControllerZNode {
   def path = "/controller"
-  def encode(brokerId: Int, timestamp: Long): Array[Byte] =
-    Json.encodeAsBytes(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" ->
timestamp.toString))
+  def encode(brokerId: Int, timestamp: Long): Array[Byte] = {
+    Json.encodeAsBytes(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" ->
timestamp.toString).asJava)
+  }
   def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { js =>
     js.asJsonObject("brokerid").to[Int]
   }
@@ -68,7 +70,7 @@ object BrokerIdZNode {
              rack: Option[String],
              apiVersion: ApiVersion): Array[Byte] = {
     val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
-    Broker.toJson(version, id, host, port, advertisedEndpoints, jmxPort, rack).getBytes(UTF_8)
+    Broker.toJsonBytes(version, id, host, port, advertisedEndpoints, jmxPort, rack)
   }
 
   def decode(id: Int, bytes: Array[Byte]): Broker = {
@@ -83,8 +85,10 @@ object TopicsZNode {
 object TopicZNode {
   def path(topic: String) = s"${TopicsZNode.path}/$topic"
   def encode(assignment: collection.Map[TopicPartition, Seq[Int]]): Array[Byte] = {
-    val assignmentJson = assignment.map { case (partition, replicas) => partition.partition.toString
-> replicas }
-    Json.encodeAsBytes(Map("version" -> 1, "partitions" -> assignmentJson))
+    val assignmentJson = assignment.map { case (partition, replicas) =>
+      partition.partition.toString -> replicas.asJava
+    }
+    Json.encodeAsBytes(Map("version" -> 1, "partitions" -> assignmentJson.asJava).asJava)
   }
   def decode(topic: String, bytes: Array[Byte]): Map[TopicPartition, Seq[Int]] = {
     Json.parseBytes(bytes).flatMap { js =>
@@ -113,7 +117,7 @@ object TopicPartitionStateZNode {
     val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
     val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
     Json.encodeAsBytes(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch"
-> leaderAndIsr.leaderEpoch,
-      "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr))
+      "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr.asJava).asJava)
   }
   def decode(bytes: Array[Byte], stat: Stat): Option[LeaderIsrAndControllerEpoch] = {
     Json.parseBytes(bytes).map { js =>
@@ -135,8 +139,7 @@ object ConfigEntityTypeZNode {
 object ConfigEntityZNode {
   def path(entityType: String, entityName: String) = s"${ConfigEntityTypeZNode.path(entityType)}/$entityName"
   def encode(config: Properties): Array[Byte] = {
-    import scala.collection.JavaConverters._
-    Json.encodeAsBytes(Map("version" -> 1, "config" -> config.asScala))
+    Json.encodeAsBytes(Map("version" -> 1, "config" -> config).asJava)
   }
   def decode(bytes: Array[Byte]): Properties = {
     val props = new Properties()
@@ -157,8 +160,8 @@ object ConfigEntityChangeNotificationZNode {
 object ConfigEntityChangeNotificationSequenceZNode {
   val SequenceNumberPrefix = "config_change_"
   def createPath = s"${ConfigEntityChangeNotificationZNode.path}/$SequenceNumberPrefix"
-  def encode(sanitizedEntityPath : String): Array[Byte] = Json.encodeAsBytes(Map("version"
-> 2, "entity_path" -> sanitizedEntityPath))
-  def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8)
+  def encode(sanitizedEntityPath: String): Array[Byte] = Json.encodeAsBytes(
+    Map("version" -> 2, "entity_path" -> sanitizedEntityPath).asJava)
 }
 
 object IsrChangeNotificationZNode {
@@ -169,8 +172,8 @@ object IsrChangeNotificationSequenceZNode {
   val SequenceNumberPrefix = "isr_change_"
   def path(sequenceNumber: String = "") = s"${IsrChangeNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber"
   def encode(partitions: collection.Set[TopicPartition]): Array[Byte] = {
-    val partitionsJson = partitions.map(partition => Map("topic" -> partition.topic,
"partition" -> partition.partition))
-    Json.encodeAsBytes(Map("version" -> IsrChangeNotificationHandler.Version, "partitions"
-> partitionsJson))
+    val partitionsJson = partitions.map(partition => Map("topic" -> partition.topic,
"partition" -> partition.partition).asJava)
+    Json.encodeAsBytes(Map("version" -> IsrChangeNotificationHandler.Version, "partitions"
-> partitionsJson.asJava).asJava)
   }
 
   def decode(bytes: Array[Byte]): Set[TopicPartition] = {
@@ -195,8 +198,9 @@ object LogDirEventNotificationSequenceZNode {
   val SequenceNumberPrefix = "log_dir_event_"
   val LogDirFailureEvent = 1
   def path(sequenceNumber: String) = s"${LogDirEventNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber"
-  def encode(brokerId: Int) =
-    Json.encodeAsBytes(Map("version" -> 1, "broker" -> brokerId, "event" -> LogDirFailureEvent))
+  def encode(brokerId: Int) = {
+    Json.encodeAsBytes(Map("version" -> 1, "broker" -> brokerId, "event" -> LogDirFailureEvent).asJava)
+  }
   def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { js =>
     js.asJsonObject("broker").to[Int]
   }
@@ -219,9 +223,9 @@ object ReassignPartitionsZNode {
   def path = s"${AdminZNode.path}/reassign_partitions"
   def encode(reassignment: collection.Map[TopicPartition, Seq[Int]]): Array[Byte] = {
     val reassignmentJson = reassignment.map { case (tp, replicas) =>
-      Map("topic" -> tp.topic, "partition" -> tp.partition, "replicas" -> replicas)
-    }
-    Json.encodeAsBytes(Map("version" -> 1, "partitions" -> reassignmentJson))
+      Map("topic" -> tp.topic, "partition" -> tp.partition, "replicas" -> replicas.asJava).asJava
+    }.asJava
+    Json.encodeAsBytes(Map("version" -> 1, "partitions" -> reassignmentJson).asJava)
   }
   def decode(bytes: Array[Byte]): Map[TopicPartition, Seq[Int]] = Json.parseBytes(bytes).flatMap
{ js =>
     val reassignmentJson = js.asJsonObject
@@ -242,8 +246,8 @@ object PreferredReplicaElectionZNode {
   def path = s"${AdminZNode.path}/preferred_replica_election"
   def encode(partitions: Set[TopicPartition]): Array[Byte] = {
     val jsonMap = Map("version" -> 1,
-      "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition"
-> tp.partition)))
-    Json.encodeAsBytes(jsonMap)
+      "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition"
-> tp.partition).asJava).asJava)
+    Json.encodeAsBytes(jsonMap.asJava)
   }
   def decode(bytes: Array[Byte]): Set[TopicPartition] = Json.parseBytes(bytes).map { js =>
     val partitionsJson = js.asJsonObject("partitions").asJsonArray
@@ -296,7 +300,9 @@ object ResourceTypeZNode {
 
 object ResourceZNode {
   def path(resource: Resource) = s"${AclZNode.path}/${resource.resourceType}/${resource.name}"
-  def encode(acls: Set[Acl]): Array[Byte] = Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls))
+  def encode(acls: Set[Acl]): Array[Byte] = {
+    Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls).asJava)
+  }
   def decode(bytes: Array[Byte], stat: Stat): VersionedAcls = VersionedAcls(Acl.fromBytes(bytes),
stat.getVersion)
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 0c9bd6e..d1e758d 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -532,8 +532,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest
{
 
     // Write config without notification to ZK.
     val configMap = Map[String, String] ("producer_byte_rate" -> "1000", "consumer_byte_rate"
-> "2000")
-    val map = Map("version" -> 1, "config" -> configMap)
-    zkUtils.updatePersistentPath(ZkUtils.getEntityConfigPath(ConfigType.Client, clientId),
Json.encode(map))
+    val map = Map("version" -> 1, "config" -> configMap.asJava)
+    zkUtils.updatePersistentPath(ZkUtils.getEntityConfigPath(ConfigType.Client, clientId),
Json.encodeAsString(map.asJava))
 
     val configInZk: Map[String, Properties] = AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client)
     assertEquals("Must have 1 overriden client config", 1, configInZk.size)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
index 3012112..12fcba6 100644
--- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
@@ -153,7 +153,7 @@ private object PartitionAssignorTest extends Logging {
 
   private case class StaticSubscriptionInfo(streamCounts: Map[String, Int]) extends SubscriptionInfo
{
     def registrationString =
-      Json.encode(Map("version" -> 1,
+      Json.legacyEncodeAsString(Map("version" -> 1,
                       "subscription" -> streamCounts,
                       "pattern" -> "static",
                       "timestamp" -> 1234.toString))
@@ -166,7 +166,7 @@ private object PartitionAssignorTest extends Logging {
   private case class WildcardSubscriptionInfo(streamCount: Int, regex: String, isWhitelist:
Boolean)
           extends SubscriptionInfo {
     def registrationString =
-      Json.encode(Map("version" -> 1,
+      Json.legacyEncodeAsString(Map("version" -> 1,
                       "subscription" -> Map(regex -> streamCount),
                       "pattern" -> (if (isWhitelist) "white_list" else "black_list")))
 
@@ -206,7 +206,7 @@ private object PartitionAssignorTest extends Logging {
     scenario.topicPartitionCounts.foreach { case(topic, partitionCount) =>
       val replicaAssignment = Map((0 until partitionCount).map(partition => (partition.toString,
Seq(0))):_*)
       EasyMock.expect(zkClient.readData("/brokers/topics/%s".format(topic), new Stat()))
-              .andReturn(zkUtils.replicaAssignmentZkData(replicaAssignment))
+        .andReturn(zkUtils.replicaAssignmentZkData(replicaAssignment))
       EasyMock.expectLastCall().anyTimes()
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/core/src/test/scala/unit/kafka/security/auth/AclTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/AclTest.scala b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala
index dfdd85f..beeac37 100644
--- a/core/src/test/scala/unit/kafka/security/auth/AclTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala
@@ -22,6 +22,7 @@ import kafka.utils.Json
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.{Assert, Test}
 import org.scalatest.junit.JUnitSuite
+import scala.collection.JavaConverters._
 
 class AclTest extends JUnitSuite {
 
@@ -36,9 +37,9 @@ class AclTest extends JUnitSuite {
     val acl3 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"), Deny, "host1",
Read)
 
     val acls = Set[Acl](acl1, acl2, acl3)
-    val jsonAcls = Json.encode(Acl.toJsonCompatibleMap(acls))
+    val jsonAcls = Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls).asJava)
 
-    Assert.assertEquals(acls, Acl.fromBytes(jsonAcls.getBytes(UTF_8)))
+    Assert.assertEquals(acls, Acl.fromBytes(jsonAcls))
     Assert.assertEquals(acls, Acl.fromBytes(AclJson.getBytes(UTF_8)))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 61da420..d596d0f 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -31,6 +31,7 @@ import kafka.admin.AdminOperationException
 import org.apache.kafka.common.TopicPartition
 
 import scala.collection.Map
+import scala.collection.JavaConverters._
 
 class DynamicConfigChangeTest extends KafkaServerTestHarness {
   def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
@@ -195,7 +196,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     // Incorrect Map. No version
     try {
       val jsonMap = Map("v" -> 1, "x" -> 2)
-      configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap))
+      configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap.asJava))
       fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
     }
     catch {
@@ -204,7 +205,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     // Version is provided. EntityType is incorrect
     try {
       val jsonMap = Map("version" -> 1, "entity_type" -> "garbage", "entity_name" ->
"x")
-      configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap))
+      configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap.asJava))
       fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
     }
     catch {
@@ -214,7 +215,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     // EntityName isn't provided
     try {
       val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic)
-      configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap))
+      configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap.asJava))
       fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
     }
     catch {
@@ -223,7 +224,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
 
     // Everything is provided
     val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic, "entity_name"
-> "x")
-    configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap))
+    configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap.asJava))
 
     // Verify that processConfigChanges was only called once
     EasyMock.verify(handler)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/core/src/test/scala/unit/kafka/utils/JsonTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JsonTest.scala b/core/src/test/scala/unit/kafka/utils/JsonTest.scala
index 93509b4..fa2a030 100644
--- a/core/src/test/scala/unit/kafka/utils/JsonTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/JsonTest.scala
@@ -16,6 +16,8 @@
  */
 package kafka.utils
 
+import java.nio.charset.StandardCharsets
+
 import org.junit.Assert._
 import org.junit.Test
 import com.fasterxml.jackson.databind.JsonNode
@@ -47,7 +49,7 @@ class JsonTest {
 
     // Test with encoder that properly escapes backslash and quotes
     val map = Map("foo1" -> """bar1\,bar2""", "foo2" -> """\bar""")
-    val encoded = Json.encode(map)
+    val encoded = Json.legacyEncodeAsString(map)
     val decoded = Json.parseFull(encoded)
     assertEquals(Json.parseFull("""{"foo1":"bar1\\,bar2", "foo2":"\\bar"}"""), decoded)
 
@@ -57,24 +59,71 @@ class JsonTest {
   }
 
   @Test
-  def testJsonEncoding() {
-    assertEquals("null", Json.encode(null))
-    assertEquals("1", Json.encode(1))
-    assertEquals("1", Json.encode(1L))
-    assertEquals("1", Json.encode(1.toByte))
-    assertEquals("1", Json.encode(1.toShort))
-    assertEquals("1.0", Json.encode(1.0))
-    assertEquals("\"str\"", Json.encode("str"))
-    assertEquals("true", Json.encode(true))
-    assertEquals("false", Json.encode(false))
-    assertEquals("[]", Json.encode(Seq()))
-    assertEquals("[1,2,3]", Json.encode(Seq(1,2,3)))
-    assertEquals("[1,\"2\",[3]]", Json.encode(Seq(1,"2",Seq(3))))
-    assertEquals("{}", Json.encode(Map()))
-    assertEquals("{\"a\":1,\"b\":2}", Json.encode(Map("a" -> 1, "b" -> 2)))
-    assertEquals("{\"a\":[1,2],\"c\":[3,4]}", Json.encode(Map("a" -> Seq(1,2), "c" ->
Seq(3,4))))
-    assertEquals(""""str1\\,str2"""", Json.encode("""str1\,str2"""))
-    assertEquals(""""\"quoted\""""", Json.encode(""""quoted""""))
+  def testLegacyEncodeAsString() {
+    assertEquals("null", Json.legacyEncodeAsString(null))
+    assertEquals("1", Json.legacyEncodeAsString(1))
+    assertEquals("1", Json.legacyEncodeAsString(1L))
+    assertEquals("1", Json.legacyEncodeAsString(1.toByte))
+    assertEquals("1", Json.legacyEncodeAsString(1.toShort))
+    assertEquals("1.0", Json.legacyEncodeAsString(1.0))
+    assertEquals(""""str"""", Json.legacyEncodeAsString("str"))
+    assertEquals("true", Json.legacyEncodeAsString(true))
+    assertEquals("false", Json.legacyEncodeAsString(false))
+    assertEquals("[]", Json.legacyEncodeAsString(Seq()))
+    assertEquals("[1,2,3]", Json.legacyEncodeAsString(Seq(1,2,3)))
+    assertEquals("""[1,"2",[3]]""", Json.legacyEncodeAsString(Seq(1,"2",Seq(3))))
+    assertEquals("{}", Json.legacyEncodeAsString(Map()))
+    assertEquals("""{"a":1,"b":2}""", Json.legacyEncodeAsString(Map("a" -> 1, "b" ->
2)))
+    assertEquals("""{"a":[1,2],"c":[3,4]}""", Json.legacyEncodeAsString(Map("a" -> Seq(1,2),
"c" -> Seq(3,4))))
+    assertEquals(""""str1\\,str2"""", Json.legacyEncodeAsString("""str1\,str2"""))
+    assertEquals(""""\"quoted\""""", Json.legacyEncodeAsString(""""quoted""""))
+
+  }
+
+  @Test
+  def testEncodeAsString() {
+    assertEquals("null", Json.encodeAsString(null))
+    assertEquals("1", Json.encodeAsString(1))
+    assertEquals("1", Json.encodeAsString(1L))
+    assertEquals("1", Json.encodeAsString(1.toByte))
+    assertEquals("1", Json.encodeAsString(1.toShort))
+    assertEquals("1.0", Json.encodeAsString(1.0))
+    assertEquals(""""str"""", Json.encodeAsString("str"))
+    assertEquals("true", Json.encodeAsString(true))
+    assertEquals("false", Json.encodeAsString(false))
+    assertEquals("[]", Json.encodeAsString(Seq().asJava))
+    assertEquals("[null]", Json.encodeAsString(Seq(null).asJava))
+    assertEquals("[1,2,3]", Json.encodeAsString(Seq(1,2,3).asJava))
+    assertEquals("""[1,"2",[3],null]""", Json.encodeAsString(Seq(1,"2",Seq(3).asJava,null).asJava))
+    assertEquals("{}", Json.encodeAsString(Map().asJava))
+    assertEquals("""{"a":1,"b":2,"c":null}""", Json.encodeAsString(Map("a" -> 1, "b" ->
2, "c" -> null).asJava))
+    assertEquals("""{"a":[1,2],"c":[3,4]}""", Json.encodeAsString(Map("a" -> Seq(1,2).asJava,
"c" -> Seq(3,4).asJava).asJava))
+    assertEquals("""{"a":[1,2],"b":[3,4],"c":null}""", Json.encodeAsString(Map("a" ->
Seq(1,2).asJava, "b" -> Seq(3,4).asJava, "c" -> null).asJava))
+    assertEquals(""""str1\\,str2"""", Json.encodeAsString("""str1\,str2"""))
+    assertEquals(""""\"quoted\""""", Json.encodeAsString(""""quoted""""))
+  }
+
+  @Test
+  def testEncodeAsBytes() {
+    assertEquals("null", new String(Json.encodeAsBytes(null), StandardCharsets.UTF_8))
+    assertEquals("1", new String(Json.encodeAsBytes(1), StandardCharsets.UTF_8))
+    assertEquals("1", new String(Json.encodeAsBytes(1L), StandardCharsets.UTF_8))
+    assertEquals("1", new String(Json.encodeAsBytes(1.toByte), StandardCharsets.UTF_8))
+    assertEquals("1", new String(Json.encodeAsBytes(1.toShort), StandardCharsets.UTF_8))
+    assertEquals("1.0", new String(Json.encodeAsBytes(1.0), StandardCharsets.UTF_8))
+    assertEquals(""""str"""",  new String(Json.encodeAsBytes("str"), StandardCharsets.UTF_8))
+    assertEquals("true", new String(Json.encodeAsBytes(true), StandardCharsets.UTF_8))
+    assertEquals("false", new String(Json.encodeAsBytes(false), StandardCharsets.UTF_8))
+    assertEquals("[]", new String(Json.encodeAsBytes(Seq().asJava), StandardCharsets.UTF_8))
+    assertEquals("[null]", new String(Json.encodeAsBytes(Seq(null).asJava), StandardCharsets.UTF_8))
+    assertEquals("[1,2,3]", new String(Json.encodeAsBytes(Seq(1,2,3).asJava), StandardCharsets.UTF_8))
+    assertEquals("""[1,"2",[3],null]""", new String(Json.encodeAsBytes(Seq(1,"2",Seq(3).asJava,null).asJava),
StandardCharsets.UTF_8))
+    assertEquals("{}", new String(Json.encodeAsBytes(Map().asJava), StandardCharsets.UTF_8))
+    assertEquals("""{"a":1,"b":2,"c":null}""", new String(Json.encodeAsBytes(Map("a" ->
1, "b" -> 2, "c" -> null).asJava), StandardCharsets.UTF_8))
+    assertEquals("""{"a":[1,2],"c":[3,4]}""", new String(Json.encodeAsBytes(Map("a" ->
Seq(1,2).asJava, "c" -> Seq(3,4).asJava).asJava), StandardCharsets.UTF_8))
+    assertEquals("""{"a":[1,2],"b":[3,4],"c":null}""", new String(Json.encodeAsBytes(Map("a"
-> Seq(1,2).asJava, "b" -> Seq(3,4).asJava, "c" -> null).asJava), StandardCharsets.UTF_8))
+    assertEquals(""""str1\\,str2"""", new String(Json.encodeAsBytes("""str1\,str2"""), StandardCharsets.UTF_8))
+    assertEquals(""""\"quoted\""""", new String(Json.encodeAsBytes(""""quoted""""), StandardCharsets.UTF_8))
   }
   
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index 987160d..05f8379 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -24,6 +24,7 @@ import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
 import org.junit.{Before, Test}
 import org.easymock.EasyMock
+import scala.collection.JavaConverters._
 
 class ReplicationUtilsTest extends ZooKeeperTestHarness {
   private val zkVersion = 1
@@ -34,8 +35,8 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
   private val controllerEpoch = 1
   private val isr = List(1, 2)
   private val topicPath = s"/brokers/topics/$topic/partitions/$partition/state"
-  private val topicData = Json.encode(Map("controller_epoch" -> controllerEpoch, "leader"
-> leader,
-    "versions" -> 1, "leader_epoch" -> leaderEpoch, "isr" -> isr))
+  private val topicData = Json.encodeAsString(Map("controller_epoch" -> controllerEpoch,
"leader" -> leader,
+    "versions" -> 1, "leader_epoch" -> leaderEpoch, "isr" -> isr).asJava)
 
   @Before
   override def setUp() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
index ecd0706..9f78124 100755
--- a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
@@ -96,7 +96,7 @@ class ZkUtilsTest extends ZooKeeperTestHarness {
     val controllerEpoch = 1
     val isr = List(1, 2)
     val topicPath = s"/brokers/topics/$topic/partitions/$partition/state"
-    val topicData = Json.encode(Map("controller_epoch" -> controllerEpoch, "leader" ->
leader,
+    val topicData = Json.legacyEncodeAsString(Map("controller_epoch" -> controllerEpoch,
"leader" -> leader,
       "versions" -> 1, "leader_epoch" -> leaderEpoch, "isr" -> isr))
     zkUtils.createPersistentPath(topicPath, topicData)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a508a43/gradle/dependencies.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 2436241..6f30e7a 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -73,7 +73,7 @@ versions += [
   zkclient: "0.10",
   zookeeper: "3.4.10",
   jfreechart: "1.0.0",
-  mavenArtifact: "3.5.0",
+  mavenArtifact: "3.5.0"
 ]
 
 libs += [


Mime
View raw message