kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lind...@apache.org
Subject [kafka] branch trunk updated: KAFKA-5919; Adding checks on "version" field for tools using it
Date Mon, 04 Jun 2018 21:33:56 GMT
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c8f38df  KAFKA-5919; Adding checks on "version" field for tools using it
c8f38df is described below

commit c8f38dff71da356604cd357e92e4a9263bacafb4
Author: Paolo Patierno <ppatierno@live.com>
AuthorDate: Mon Jun 4 14:31:33 2018 -0700

    KAFKA-5919; Adding checks on "version" field for tools using it
    
    Adding checks on "version" field for tools using it.
    This is a new version of the closed PR #3887 (to see for more comments and related discussion).
    
    Author: Paolo Patierno <ppatierno@live.com>
    
    Reviewers: Dong Lin <lindong28@gmail.com>
    
    Closes #5126 from ppatierno/kafka-5919-update
---
 .../scala/kafka/admin/DeleteRecordsCommand.scala   | 37 ++++++---
 .../kafka/admin/ReassignPartitionsCommand.scala    | 91 ++++++++++++++++------
 core/src/main/scala/kafka/utils/ZkUtils.scala      |  8 --
 3 files changed, 94 insertions(+), 42 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
index 14d38ec..4a85d09 100644
--- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
+++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.admin
 import org.apache.kafka.clients.admin.RecordsToDelete
 import org.apache.kafka.clients.CommonClientConfigs
 import joptsimple._
+import kafka.utils.json.JsonValue
 
 import scala.collection.JavaConverters._
 
@@ -36,20 +37,38 @@ import scala.collection.JavaConverters._
  */
 object DeleteRecordsCommand {
 
+  private[admin] val EarliestVersion = 1
+
   def main(args: Array[String]): Unit = {
     execute(args, System.out)
   }
 
   def parseOffsetJsonStringWithoutDedup(jsonData: String): Seq[(TopicPartition, Long)] =
{
-    Json.parseFull(jsonData).toSeq.flatMap { js =>
-      js.asJsonObject.get("partitions").toSeq.flatMap { partitionsJs =>
-        partitionsJs.asJsonArray.iterator.map(_.asJsonObject).map { partitionJs =>
-          val topic = partitionJs("topic").to[String]
-          val partition = partitionJs("partition").to[Int]
-          val offset = partitionJs("offset").to[Long]
-          new TopicPartition(topic, partition) -> offset
-        }.toBuffer
-      }
+    Json.parseFull(jsonData) match {
+      case Some(js) =>
+        val version = js.asJsonObject.get("version") match {
+          case Some(jsonValue) => jsonValue.to[Int]
+          case None => EarliestVersion
+        }
+        parseJsonData(version, js)
+      case None => throw new AdminOperationException("The input string is not a valid
JSON")
+    }
+  }
+
+  def parseJsonData(version: Int, js: JsonValue): Seq[(TopicPartition, Long)] = {
+    version match {
+      case 1 =>
+        js.asJsonObject.get("partitions") match {
+          case Some(partitions) =>
+            partitions.asJsonArray.iterator.map(_.asJsonObject).map { partitionJs =>
+              val topic = partitionJs("topic").to[String]
+              val partition = partitionJs("partition").to[Int]
+              val offset = partitionJs("offset").to[Long]
+              new TopicPartition(topic, partition) -> offset
+            }.toBuffer
+          case _ => throw new AdminOperationException("Missing partitions field");
+        }
+      case _ => throw new AdminOperationException(s"Not supported version field value
$version")
     }
   }
 
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index f765b94..4d9da90 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -25,6 +25,7 @@ import kafka.log.LogConfig
 import kafka.log.LogConfig._
 import kafka.server.{ConfigType, DynamicConfig}
 import kafka.utils._
+import kafka.utils.json.JsonValue
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo
 import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaLogDirsOptions, AdminClient
=> JAdminClient}
@@ -45,6 +46,8 @@ object ReassignPartitionsCommand extends Logging {
   private[admin] val NoThrottle = Throttle(-1, -1)
   private[admin] val AnyLogDir = "any"
 
+  private[admin] val EarliestVersion = 1
+
   def main(args: Array[String]): Unit = {
     val opts = validateAndParseArgs(args)
     val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
@@ -168,7 +171,7 @@ object ReassignPartitionsCommand extends Logging {
   }
 
   def generateAssignment(zkClient: KafkaZkClient, brokerListToReassign: Seq[Int], topicsToMoveJsonString:
String, disableRackAware: Boolean): (Map[TopicPartition, Seq[Int]], Map[TopicPartition, Seq[Int]])
= {
-    val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
+    val topicsToReassign = parseTopicsData(topicsToMoveJsonString)
     val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign)
     if (duplicateTopicsToReassign.nonEmpty)
       throw new AdminCommandFailedException("List of topics to reassign contains duplicate
entries: %s".format(duplicateTopicsToReassign.mkString(",")))
@@ -241,32 +244,70 @@ object ReassignPartitionsCommand extends Logging {
     ).asJava)
   }
 
-  // Parses without deduplicating keys so the data can be checked before allowing reassignment
to proceed
+  def parseTopicsData(jsonData: String): Seq[String] = {
+    Json.parseFull(jsonData) match {
+      case Some(js) =>
+        val version = js.asJsonObject.get("version") match {
+          case Some(jsonValue) => jsonValue.to[Int]
+          case None => EarliestVersion
+        }
+        parseTopicsData(version, js)
+      case None => throw new AdminOperationException("The input string is not a valid
JSON")
+    }
+  }
+
+  def parseTopicsData(version: Int, js: JsonValue): Seq[String] = {
+    version match {
+      case 1 =>
+        for {
+          partitionsSeq <- js.asJsonObject.get("topics").toSeq
+          p <- partitionsSeq.asJsonArray.iterator
+        } yield p.asJsonObject("topic").to[String]
+      case _ => throw new AdminOperationException(s"Not supported version field value
$version")
+    }
+  }
+
   def parsePartitionReassignmentData(jsonData: String): (Seq[(TopicPartition, Seq[Int])],
Map[TopicPartitionReplica, String]) = {
-    val partitionAssignment = mutable.ListBuffer.empty[(TopicPartition, Seq[Int])]
-    val replicaAssignment = mutable.Map.empty[TopicPartitionReplica, String]
-    for {
-      js <- Json.parseFull(jsonData).toSeq
-      partitionsSeq <- js.asJsonObject.get("partitions").toSeq
-      p <- partitionsSeq.asJsonArray.iterator
-    } {
-      val partitionFields = p.asJsonObject
-      val topic = partitionFields("topic").to[String]
-      val partition = partitionFields("partition").to[Int]
-      val newReplicas = partitionFields("replicas").to[Seq[Int]]
-      val newLogDirs = partitionFields.get("log_dirs") match {
-        case Some(jsonValue) => jsonValue.to[Seq[String]]
-        case None => newReplicas.map(_ => AnyLogDir)
-      }
-      if (newReplicas.size != newLogDirs.size)
-        throw new AdminCommandFailedException(s"Size of replicas list $newReplicas is different
from " +
-          s"size of log dirs list $newLogDirs for partition ${new TopicPartition(topic, partition)}")
-      partitionAssignment += (new TopicPartition(topic, partition) -> newReplicas)
-      replicaAssignment ++= newReplicas.zip(newLogDirs).map { case (replica, logDir) =>
-        new TopicPartitionReplica(topic, partition, replica) -> logDir
-      }.filter(_._2 != AnyLogDir)
+    Json.parseFull(jsonData) match {
+      case Some(js) =>
+        val version = js.asJsonObject.get("version") match {
+          case Some(jsonValue) => jsonValue.to[Int]
+          case None => EarliestVersion
+        }
+        parsePartitionReassignmentData(version, js)
+      case None => throw new AdminOperationException("The input string is not a valid
JSON")
+    }
+  }
+
+  // Parses without deduplicating keys so the data can be checked before allowing reassignment
to proceed
+  def parsePartitionReassignmentData(version:Int, jsonData: JsonValue): (Seq[(TopicPartition,
Seq[Int])], Map[TopicPartitionReplica, String]) = {
+    version match {
+      case 1 =>
+        val partitionAssignment = mutable.ListBuffer.empty[(TopicPartition, Seq[Int])]
+        val replicaAssignment = mutable.Map.empty[TopicPartitionReplica, String]
+        for {
+          partitionsSeq <- jsonData.asJsonObject.get("partitions").toSeq
+          p <- partitionsSeq.asJsonArray.iterator
+        } {
+          val partitionFields = p.asJsonObject
+          val topic = partitionFields("topic").to[String]
+          val partition = partitionFields("partition").to[Int]
+          val newReplicas = partitionFields("replicas").to[Seq[Int]]
+          val newLogDirs = partitionFields.get("log_dirs") match {
+            case Some(jsonValue) => jsonValue.to[Seq[String]]
+            case None => newReplicas.map(_ => AnyLogDir)
+          }
+          if (newReplicas.size != newLogDirs.size)
+            throw new AdminCommandFailedException(s"Size of replicas list $newReplicas is
different from " +
+              s"size of log dirs list $newLogDirs for partition ${new TopicPartition(topic,
partition)}")
+          partitionAssignment += (new TopicPartition(topic, partition) -> newReplicas)
+          replicaAssignment ++= newReplicas.zip(newLogDirs).map { case (replica, logDir)
=>
+            new TopicPartitionReplica(topic, partition, replica) -> logDir
+          }.filter(_._2 != AnyLogDir)
+        }
+        (partitionAssignment, replicaAssignment)
+      case _ => throw new AdminOperationException(s"Not supported version field value
$version")
     }
-    (partitionAssignment, replicaAssignment)
   }
 
   def parseAndValidate(zkClient: KafkaZkClient, reassignmentJsonString: String): (Seq[(TopicPartition,
Seq[Int])], Map[TopicPartitionReplica, String]) = {
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 0c16243..0179937 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -152,14 +152,6 @@ object ZkUtils {
     assignments.map { case (tp, p) => (new TopicAndPartition(tp), p) }
   }
 
-  def parseTopicsData(jsonData: String): Seq[String] = {
-    for {
-      js <- Json.parseFull(jsonData).toSeq
-      partitionsSeq <- js.asJsonObject.get("topics").toSeq
-      p <- partitionsSeq.asJsonArray.iterator
-    } yield p.asJsonObject("topic").to[String]
-  }
-
   def controllerZkData(brokerId: Int, timestamp: Long): String = {
     Json.legacyEncodeAsString(Map("version" -> 1, "brokerid" -> brokerId, "timestamp"
-> timestamp.toString))
   }

-- 
To stop receiving notification emails like this one, please contact
lindong@apache.org.

Mime
View raw message