kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Make assignment expectation explicit in testConsumptionWithBrokerFailures
Date Fri, 28 Apr 2017 10:06:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f0152a7fd -> ca6ae8116


MINOR: Make assignment expectation explicit in testConsumptionWithBrokerFailures

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #2930 from hachikuji/bouncetest-assert-assignment


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ca6ae811
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ca6ae811
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ca6ae811

Branch: refs/heads/trunk
Commit: ca6ae811662fd291421c5e064dda067517fc624e
Parents: f0152a7
Author: Jason Gustafson <jason@confluent.io>
Authored: Fri Apr 28 11:05:59 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Apr 28 11:05:59 2017 +0100

----------------------------------------------------------------------
 .../scala/integration/kafka/api/ConsumerBounceTest.scala | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ca6ae811/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 2198bf2..1ce95fb 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -100,12 +100,15 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging
{
     scheduler.start()
 
     while (scheduler.isRunning.get()) {
-      for (record <- consumer.poll(100).asScala) {
+      val records = consumer.poll(100).asScala
+      assertEquals(Set(tp), consumer.assignment.asScala)
+
+      for (record <- records) {
         assertEquals(consumed, record.offset())
         consumed += 1
       }
 
-      try {
+      if (records.nonEmpty) {
         consumer.commitSync()
         assertEquals(consumer.position(tp), consumer.committed(tp).offset)
 
@@ -113,10 +116,6 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging
{
           consumer.seekToBeginning(Collections.emptyList())
           consumed = 0
         }
-      } catch {
-        // TODO: should be no need to catch these exceptions once KAFKA-2017 is
-        // merged since coordinator fail-over will not cause a rebalance
-        case _: CommitFailedException =>
       }
     }
     scheduler.shutdown()


Mime
View raw message