kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [1/2] kafka git commit: KAFKA-6324; Change LogSegment.delete to deleteIfExists and harden log recovery
Date Tue, 12 Dec 2017 08:00:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 72eec0a04 -> a5cd34d79


http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index a0680a2..4a0d46e 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -20,7 +20,6 @@ package kafka.producer
 import java.net.SocketTimeoutException
 import java.util.Properties
 
-import kafka.admin.AdminUtils
 import kafka.api.{ProducerRequest, ProducerResponseStatus}
 import kafka.common.TopicAndPartition
 import kafka.integration.KafkaServerTestHarness

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
index 9ff47dd..25b2fed 100755
--- a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.{Base64, Utils}
 import org.slf4j.event.Level
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, ExecutionContext, Future}
 
@@ -46,6 +47,56 @@ class CoreUtilsTest extends JUnitSuite with Logging {
   }
 
   @Test
+  def testTryAll(): Unit = {
+    case class TestException(key: String) extends Exception
+
+    val recorded = mutable.Map.empty[String, Either[TestException, String]]
+    def recordingFunction(v: Either[TestException, String]): Unit = {
+      val key = v match {
+        case Right(key) => key
+        case Left(e) => e.key
+      }
+      recorded(key) = v
+    }
+
+    CoreUtils.tryAll(Seq(
+      () => recordingFunction(Right("valid-0")),
+      () => recordingFunction(Left(new TestException("exception-1"))),
+      () => recordingFunction(Right("valid-2")),
+      () => recordingFunction(Left(new TestException("exception-3")))
+    ))
+    var expected = Map(
+      "valid-0" -> Right("valid-0"),
+      "exception-1" -> Left(TestException("exception-1")),
+      "valid-2" -> Right("valid-2"),
+      "exception-3" -> Left(TestException("exception-3"))
+    )
+    assertEquals(expected, recorded)
+
+    recorded.clear()
+    CoreUtils.tryAll(Seq(
+      () => recordingFunction(Right("valid-0")),
+      () => recordingFunction(Right("valid-1"))
+    ))
+    expected = Map(
+      "valid-0" -> Right("valid-0"),
+      "valid-1" -> Right("valid-1")
+    )
+    assertEquals(expected, recorded)
+
+    recorded.clear()
+    CoreUtils.tryAll(Seq(
+      () => recordingFunction(Left(new TestException("exception-0"))),
+      () => recordingFunction(Left(new TestException("exception-1")))
+    ))
+    expected = Map(
+      "exception-0" -> Left(TestException("exception-0")),
+      "exception-1" -> Left(TestException("exception-1"))
+    )
+    assertEquals(expected, recorded)
+  }
+
+  @Test
   def testCircularIterator() {
     val l = List(1, 2)
     val itl = CoreUtils.circularIterator(l)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
index c5f383c..5ebdf40 100644
--- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
+++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.common.utils.Time
 class MockScheduler(val time: Time) extends Scheduler {
   
   /* a priority queue of tasks ordered by next execution time */
-  var tasks = new PriorityQueue[MockTask]()
+  private val tasks = new PriorityQueue[MockTask]()
   
   def isStarted = true
 
@@ -77,6 +77,12 @@ class MockScheduler(val time: Time) extends Scheduler {
       tick()
     }
   }
+
+  def clear(): Unit = {
+    this synchronized {
+      tasks.clear()
+    }
+  }
   
 }
 


Mime
View raw message