From commits-return-9305-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Sat Mar 31 16:57:33 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 37E6E180671 for ; Sat, 31 Mar 2018 16:57:33 +0200 (CEST) Received: (qmail 64026 invoked by uid 500); 31 Mar 2018 14:57:32 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 64017 invoked by uid 99); 31 Mar 2018 14:57:32 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 31 Mar 2018 14:57:32 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 8CD4180935; Sat, 31 Mar 2018 14:57:31 +0000 (UTC) Date: Sat, 31 Mar 2018 14:57:31 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: KAFKA-4914: Partition reassignment tool should check types before persisting state in ZooKeeper (#2708) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152250825104.27927.15539663992740406538@gitbox.apache.org> From: ijuma@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 2ef6ee2338178c7501f5bd4c7cce5f4cea9d3e17 X-Git-Newrev: 4106cb1db18cc45cbbeb54546a7468a4727bd88f X-Git-Rev: 4106cb1db18cc45cbbeb54546a7468a4727bd88f X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 4106cb1 KAFKA-4914: Partition reassignment tool should check types before persisting state in ZooKeeper (#2708) 4106cb1 is described below commit 4106cb1db18cc45cbbeb54546a7468a4727bd88f Author: Nick Travers AuthorDate: Sat Mar 31 07:57:19 2018 -0700 KAFKA-4914: Partition reassignment tool should check types before persisting state in ZooKeeper (#2708) Prior to this, there have been instances where invalid data was allowed to be persisted in ZooKeeper, which causes ClassCastExceptions when a broker is restarted and reads this type-unsafe data. Adds basic structural and type validation for the reassignment JSON via introduction of Scala case classes that map to the expected JSON structure. Also use the Scala case classes to deserialize the JSON to avoid duplication. Reviewers: Manikumar Reddy , Viktor Somogyi , Ismael Juma --- core/src/main/scala/kafka/utils/Json.scala | 22 ++++++++- core/src/main/scala/kafka/utils/ZkUtils.scala | 20 +++----- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 10 +++- core/src/main/scala/kafka/zk/ZkData.scala | 54 ++++++++++++++------- .../src/test/scala/unit/kafka/utils/JsonTest.scala | 31 ++++++++++-- .../kafka/zk/ReassignPartitionsZNodeTest.scala | 56 ++++++++++++++++++++++ 6 files changed, 155 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala index cbb8dac..c61e149 100644 --- a/core/src/main/scala/kafka/utils/Json.scala +++ b/core/src/main/scala/kafka/utils/Json.scala @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import kafka.utils.json.JsonValue import scala.collection._ +import scala.reflect.ClassTag /** * Provides methods for parsing JSON with Jackson and encoding to JSON with a simple and naive custom implementation. @@ -46,6 +47,15 @@ object Json { } /** + * Parse a JSON string into either a generic type T, or a JsonProcessingException in the case of + * exception. + */ + def parseStringAs[T](input: String)(implicit tag: ClassTag[T]): Either[JsonProcessingException, T] = { + try Right(mapper.readValue(input, tag.runtimeClass).asInstanceOf[T]) + catch { case e: JsonProcessingException => Left(e) } + } + + /** * Parse a JSON byte array into a JsonValue if possible. `None` is returned if `input` is not valid JSON. */ def parseBytes(input: Array[Byte]): Option[JsonValue] = @@ -57,6 +67,14 @@ object Json { catch { case e: JsonProcessingException => Left(e) } /** + * Parse a JSON byte array into either a generic type T, or a JsonProcessingException in the case of exception. + */ + def parseBytesAs[T](input: Array[Byte])(implicit tag: ClassTag[T]): Either[JsonProcessingException, T] = { + try Right(mapper.readValue(input, tag.runtimeClass).asInstanceOf[T]) + catch { case e: JsonProcessingException => Left(e) } + } + + /** * Encode an object into a JSON string. This method accepts any type T where * T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T] * Any other type will result in an exception. @@ -83,10 +101,10 @@ object Json { } /** - * Encode an object into a JSON string. This method accepts any type supported by Jackson's ObjectMapper in + * Encode an object into a JSON string. This method accepts any type supported by Jackson's ObjectMapper in * the default configuration. That is, Java collections are supported, but Scala collections are not (to avoid * a jackson-scala dependency). - */ + */ def encodeAsString(obj: Any): String = mapper.writeValueAsString(obj) /** diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index d5fde4d..0c16243 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -26,7 +26,7 @@ import kafka.cluster._ import kafka.common.{KafkaException, NoEpochForPartitionException, TopicAndPartition} import kafka.consumer.{ConsumerThreadId, TopicCount} import kafka.controller.{LeaderIsrAndControllerEpoch, ReassignedPartitionsContext} -import kafka.zk.{BrokerIdZNode, ZkData} +import kafka.zk.{BrokerIdZNode, ReassignPartitionsZNode, ZkData} import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException} import org.I0Itec.zkclient.serialize.ZkSerializer import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection} @@ -142,20 +142,14 @@ object ZkUtils { def getDeleteTopicPath(topic: String): String = DeleteTopicsPath + "/" + topic - // Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = { - val seq = for { - js <- Json.parseFull(jsonData).toSeq - partitionsSeq <- js.asJsonObject.get("partitions").toSeq - p <- partitionsSeq.asJsonArray.iterator - } yield { - val partitionFields = p.asJsonObject - val topic = partitionFields("topic").to[String] - val partition = partitionFields("partition").to[Int] - val newReplicas = partitionFields("replicas").to[Seq[Int]] - TopicAndPartition(topic, partition) -> newReplicas + val utf8Bytes = jsonData.getBytes(StandardCharsets.UTF_8) + val assignments = ReassignPartitionsZNode.decode(utf8Bytes) match { + case Left(e) => throw e + case Right(result) => result } - seq.toMap + + assignments.map { case (tp, p) => (new TopicAndPartition(tp), p) } } def parseTopicsData(jsonData: String): Seq[String] = { diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 0a2d96a..9b58fc7 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -666,11 +666,17 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean * Returns all reassignments. * @return the reassignments for each partition. */ - def getPartitionReassignment: Map[TopicPartition, Seq[Int]] = { + def getPartitionReassignment: collection.Map[TopicPartition, Seq[Int]] = { val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path) val getDataResponse = retryRequestUntilConnected(getDataRequest) getDataResponse.resultCode match { - case Code.OK => ReassignPartitionsZNode.decode(getDataResponse.data) + case Code.OK => + ReassignPartitionsZNode.decode(getDataResponse.data) match { + case Left(e) => + logger.warn(s"Ignoring partition reassignment due to invalid json: ${e.getMessage}", e) + Map.empty[TopicPartition, Seq[Int]] + case Right(assignments) => assignments + } case Code.NONODE => Map.empty case _ => throw getDataResponse.resultException.get } diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index b6352fa..fbed11d 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -19,6 +19,8 @@ package kafka.zk import java.nio.charset.StandardCharsets.UTF_8 import java.util.Properties +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.core.JsonProcessingException import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr} import kafka.cluster.{Broker, EndPoint} import kafka.common.KafkaException @@ -35,9 +37,10 @@ import org.apache.kafka.common.utils.Time import org.apache.zookeeper.ZooDefs import org.apache.zookeeper.data.{ACL, Stat} +import scala.beans.BeanProperty import scala.collection.JavaConverters._ -import scala.collection.Seq import scala.collection.mutable.ArrayBuffer +import scala.collection.{Seq, breakOut} // This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes). @@ -367,26 +370,41 @@ object DeleteTopicsTopicZNode { } object ReassignPartitionsZNode { + + /** + * The assignment of brokers for a `TopicPartition`. + * + * A replica assignment consists of a `topic`, `partition` and a list of `replicas`, which + * represent the broker ids that the `TopicPartition` is assigned to. + */ + case class ReplicaAssignment(@BeanProperty @JsonProperty("topic") topic: String, + @BeanProperty @JsonProperty("partition") partition: Int, + @BeanProperty @JsonProperty("replicas") replicas: java.util.List[Int]) + + /** + * An assignment consists of a `version` and a list of `partitions`, which represent the + * assignment of topic-partitions to brokers. + */ + case class PartitionAssignment(@BeanProperty @JsonProperty("version") version: Int, + @BeanProperty @JsonProperty("partitions") partitions: java.util.List[ReplicaAssignment]) + def path = s"${AdminZNode.path}/reassign_partitions" - def encode(reassignment: collection.Map[TopicPartition, Seq[Int]]): Array[Byte] = { - val reassignmentJson = reassignment.map { case (tp, replicas) => - Map("topic" -> tp.topic, "partition" -> tp.partition, "replicas" -> replicas.asJava).asJava - }.asJava - Json.encodeAsBytes(Map("version" -> 1, "partitions" -> reassignmentJson).asJava) + + def encode(reassignmentMap: collection.Map[TopicPartition, Seq[Int]]): Array[Byte] = { + val reassignment = PartitionAssignment(1, + reassignmentMap.toSeq.map { case (tp, replicas) => + ReplicaAssignment(tp.topic, tp.partition, replicas.asJava) + }.asJava + ) + Json.encodeAsBytes(reassignment) } - def decode(bytes: Array[Byte]): Map[TopicPartition, Seq[Int]] = Json.parseBytes(bytes).flatMap { js => - val reassignmentJson = js.asJsonObject - val partitionsJsonOpt = reassignmentJson.get("partitions") - partitionsJsonOpt.map { partitionsJson => - partitionsJson.asJsonArray.iterator.map { partitionFieldsJs => - val partitionFields = partitionFieldsJs.asJsonObject - val topic = partitionFields("topic").to[String] - val partition = partitionFields("partition").to[Int] - val replicas = partitionFields("replicas").to[Seq[Int]] - new TopicPartition(topic, partition) -> replicas - } + + def decode(bytes: Array[Byte]): Either[JsonProcessingException, collection.Map[TopicPartition, Seq[Int]]] = + Json.parseBytesAs[PartitionAssignment](bytes).right.map { partitionAssignment => + partitionAssignment.partitions.asScala.map { replicaAssignment => + new TopicPartition(replicaAssignment.topic, replicaAssignment.partition) -> replicaAssignment.replicas.asScala + }(breakOut) } - }.map(_.toMap).getOrElse(Map.empty) } object PreferredReplicaElectionZNode { diff --git a/core/src/test/scala/unit/kafka/utils/JsonTest.scala b/core/src/test/scala/unit/kafka/utils/JsonTest.scala index fa2a030..209fdee 100644 --- a/core/src/test/scala/unit/kafka/utils/JsonTest.scala +++ b/core/src/test/scala/unit/kafka/utils/JsonTest.scala @@ -18,15 +18,22 @@ package kafka.utils import java.nio.charset.StandardCharsets -import org.junit.Assert._ -import org.junit.Test +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node._ +import kafka.utils.JsonTest.TestObject import kafka.utils.json.JsonValue +import org.junit.Assert._ +import org.junit.Test import scala.collection.JavaConverters._ import scala.collection.Map +object JsonTest { + case class TestObject(@JsonProperty("foo") foo: String, @JsonProperty("bar") bar: Int) +} + class JsonTest { @Test @@ -125,5 +132,23 @@ class JsonTest { assertEquals(""""str1\\,str2"""", new String(Json.encodeAsBytes("""str1\,str2"""), StandardCharsets.UTF_8)) assertEquals(""""\"quoted\""""", new String(Json.encodeAsBytes(""""quoted""""), StandardCharsets.UTF_8)) } - + + @Test + def testParseTo() = { + val foo = "baz" + val bar = 1 + + val result = Json.parseStringAs[TestObject](s"""{"foo": "$foo", "bar": $bar}""") + + assertTrue(result.isRight) + assertEquals(TestObject(foo, bar), result.right.get) + } + + @Test + def testParseToWithInvalidJson() = { + val result = Json.parseStringAs[TestObject]("{invalid json}") + + assertTrue(result.isLeft) + assertEquals(classOf[JsonParseException], result.left.get.getClass) + } } diff --git a/core/src/test/scala/unit/kafka/zk/ReassignPartitionsZNodeTest.scala b/core/src/test/scala/unit/kafka/zk/ReassignPartitionsZNodeTest.scala new file mode 100644 index 0000000..2f3456f --- /dev/null +++ b/core/src/test/scala/unit/kafka/zk/ReassignPartitionsZNodeTest.scala @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.zk + +import java.nio.charset.StandardCharsets + +import com.fasterxml.jackson.core.JsonProcessingException +import org.apache.kafka.common.TopicPartition +import org.junit.Assert._ +import org.junit.Test + +class ReassignPartitionsZNodeTest { + + private val topic = "foo" + private val partition1 = 0 + private val replica1 = 1 + private val replica2 = 2 + + private val reassignPartitionData = Map(new TopicPartition(topic, partition1) -> Seq(replica1, replica2)) + private val reassignmentJson = """{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[1,2]}]}""" + + @Test + def testEncode() { + val encodedJsonString = new String(ReassignPartitionsZNode.encode(reassignPartitionData), StandardCharsets.UTF_8) + assertEquals(reassignmentJson, encodedJsonString) + } + + @Test + def testDecodeInvalidJson() { + val result = ReassignPartitionsZNode.decode("invalid json".getBytes) + assertTrue(result.isLeft) + assertTrue(result.left.get.isInstanceOf[JsonProcessingException]) + } + + @Test + def testDecodeValidJson() { + val result = ReassignPartitionsZNode.decode(reassignmentJson.getBytes) + assertTrue(result.isRight) + val assignmentMap = result.right.get + assertEquals(Seq(replica1, replica2), assignmentMap(new TopicPartition(topic, partition1))) + } +} -- To stop receiving notification emails like this one, please contact ijuma@apache.org.