kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6052) Windows: Consumers not polling when isolation.level=read_committed
Date Sat, 24 Mar 2018 21:05:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16412793#comment-16412793
] 

ASF GitHub Bot commented on KAFKA-6052:
---------------------------------------

hachikuji closed pull request #4705: KAFKA-6052: Fix the request retry issue (on Windows)
in InterBrokerSendThread
URL: https://github.com/apache/kafka/pull/4705
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index 70dae354aa4..c65e5572974 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -16,13 +16,17 @@
  */
 package kafka.common
 
+import java.util.{ArrayDeque, ArrayList, Collection, Collections, HashMap, Iterator}
+import java.util.Map.Entry
+
 import kafka.utils.ShutdownableThread
-import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler}
+import org.apache.kafka.clients.{ClientRequest, ClientResponse, NetworkClient, RequestCompletionHandler}
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.requests.AbstractRequest
 import org.apache.kafka.common.utils.Time
 
+import scala.collection.JavaConverters._
 
 /**
  *  Class for inter-broker send thread that utilize a non-blocking network client.
@@ -34,6 +38,10 @@ abstract class InterBrokerSendThread(name: String,
   extends ShutdownableThread(name, isInterruptible) {
 
   def generateRequests(): Iterable[RequestAndCompletionHandler]
+  def unsentExpiryMs: Int
+  private val unsentRequests = new UnsentRequests
+
+  def hasUnsentRequests = unsentRequests.iterator().hasNext
 
   override def shutdown(): Unit = {
     initiateShutdown()
@@ -43,35 +51,21 @@ abstract class InterBrokerSendThread(name: String,
   }
 
   override def doWork() {
-    val now = time.milliseconds()
-    var pollTimeout = Long.MaxValue
+    var now = time.milliseconds()
+
+    generateRequests().foreach { request =>
+      val completionHandler = request.handler
+      unsentRequests.put(request.destination,
+        networkClient.newClientRequest(request.destination.idString, request.request, now,
true, completionHandler))
+    }
 
     try {
-      for (request: RequestAndCompletionHandler <- generateRequests()) {
-        val destination = Integer.toString(request.destination.id())
-        val completionHandler = request.handler
-        val clientRequest = networkClient.newClientRequest(destination,
-          request.request,
-          now,
-          true,
-          completionHandler)
-
-        if (networkClient.ready(request.destination, now)) {
-          networkClient.send(clientRequest, now)
-        } else {
-          val header = clientRequest.makeHeader(request.request.latestAllowedVersion)
-          val disconnectResponse: ClientResponse = new ClientResponse(header, completionHandler,
destination,
-            now /* createdTimeMs */ , now /* receivedTimeMs */ , true /* disconnected */
, null /* versionMismatch */ ,
-            null /* responseBody */)
-
-          // poll timeout would be the minimum of connection delay if there are any dest
yet to be reached;
-          // otherwise it is infinity
-          pollTimeout = Math.min(pollTimeout, networkClient.connectionDelay(request.destination,
now))
-
-          completionHandler.onComplete(disconnectResponse)
-        }
-      }
-      networkClient.poll(pollTimeout, now)
+      val timeout = sendRequests(now)
+      networkClient.poll(timeout, now)
+      now = time.milliseconds()
+      checkDisconnects(now)
+      failExpiredRequests(now)
+      unsentRequests.clean()
     } catch {
       case e: FatalExitError => throw e
       case t: Throwable =>
@@ -84,9 +78,113 @@ abstract class InterBrokerSendThread(name: String,
     }
   }
 
-  def wakeup(): Unit = networkClient.wakeup()
+  private def sendRequests(now: Long): Long = {
+    var pollTimeout = Long.MaxValue
+    for (node <- unsentRequests.nodes.asScala) {
+      val requestIterator = unsentRequests.requestIterator(node)
+      while (requestIterator.hasNext) {
+        val request = requestIterator.next
+        if (networkClient.ready(node, now)) {
+          networkClient.send(request, now)
+          requestIterator.remove()
+        } else
+          pollTimeout = Math.min(pollTimeout, networkClient.connectionDelay(node, now))
+      }
+    }
+    pollTimeout
+  }
+
+  private def checkDisconnects(now: Long): Unit = {
+    // any disconnects affecting requests that have already been transmitted will be handled
+    // by NetworkClient, so we just need to check whether connections for any of the unsent
+    // requests have been disconnected; if they have, then we complete the corresponding
future
+    // and set the disconnect flag in the ClientResponse
+    val iterator = unsentRequests.iterator()
+    while (iterator.hasNext) {
+      val entry = iterator.next
+      val (node, requests) = (entry.getKey, entry.getValue)
+      if (!requests.isEmpty && networkClient.connectionFailed(node)) {
+        iterator.remove()
+        for (request <- requests.asScala) {
+          if (networkClient.authenticationException(node) != null)
+            error(s"Failed to send the following request due to authentication error: $request")
+          completeWithDisconnect(request, now)
+        }
+      }
+    }
+  }
 
+  private def failExpiredRequests(now: Long): Unit = {
+    // clear all expired unsent requests
+    val expiredRequests = unsentRequests.removeExpiredRequests(now, unsentExpiryMs)
+    for (request <- expiredRequests.asScala) {
+      debug(s"Failed to send the following request after $unsentExpiryMs ms: $request")
+      completeWithDisconnect(request, now)
+    }
+  }
+
+  def completeWithDisconnect(request: ClientRequest, now: Long): Unit = {
+    val handler = request.callback
+    handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().latestAllowedVersion()),
+      handler, request.destination, now /* createdTimeMs */ , now /* receivedTimeMs */ ,
true /* disconnected */ ,
+      null /* versionMismatch */ , null /* responseBody */))
+  }
+
+  def wakeup(): Unit = networkClient.wakeup()
 }
 
 case class RequestAndCompletionHandler(destination: Node, request: AbstractRequest.Builder[_
<: AbstractRequest],
-                                       handler: RequestCompletionHandler)
\ No newline at end of file
+                                       handler: RequestCompletionHandler)
+
+private class UnsentRequests {
+  private val unsent = new HashMap[Node, ArrayDeque[ClientRequest]]
+
+  def put(node: Node, request: ClientRequest): Unit = {
+    var requests = unsent.get(node)
+    if (requests == null) {
+      requests = new ArrayDeque[ClientRequest]
+      unsent.put(node, requests)
+    }
+    requests.add(request)
+  }
+
+  def removeExpiredRequests(now: Long, unsentExpiryMs: Long): Collection[ClientRequest] =
{
+    val expiredRequests = new ArrayList[ClientRequest]
+    for (requests <- unsent.values.asScala) {
+      val requestIterator = requests.iterator
+      var foundExpiredRequest = false
+      while (requestIterator.hasNext && !foundExpiredRequest) {
+        val request = requestIterator.next
+        if (request.createdTimeMs < now - unsentExpiryMs) {
+          expiredRequests.add(request)
+          requestIterator.remove()
+          foundExpiredRequest = true
+        }
+      }
+    }
+    expiredRequests
+  }
+
+  def clean(): Unit = {
+    val iterator = unsent.values.iterator
+    while (iterator.hasNext) {
+      val requests = iterator.next
+      if (requests.isEmpty)
+        iterator.remove()
+    }
+  }
+
+  def iterator(): Iterator[Entry[Node, ArrayDeque[ClientRequest]]] = {
+    unsent.entrySet().iterator()
+  }
+
+  def requestIterator(node: Node): Iterator[ClientRequest] = {
+    val requests = unsent.get(node)
+    if (requests == null)
+      Collections.emptyIterator[ClientRequest]
+    else
+      requests.iterator
+  }
+
+  def nodes = unsent.keySet
+}
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index fa9d2c3fed5..7059ced5b3c 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -135,6 +135,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
 
   private val txnLogAppendRetryQueue = new LinkedBlockingQueue[TxnLogAppend]()
 
+  override val unsentExpiryMs: Int = config.requestTimeoutMs
+
   newGauge(
     "UnknownDestinationQueueSize",
     new Gauge[Int] {
diff --git a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
index c6ebdd17c36..710686693e3 100644
--- a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
+++ b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
@@ -16,14 +16,15 @@
  */
 package kafka.common
 
-import org.junit.{Assert, Test}
 import kafka.utils.MockTime
 import org.apache.kafka.clients.{ClientRequest, ClientResponse, NetworkClient, RequestCompletionHandler}
 import org.apache.kafka.common.Node
+import org.apache.kafka.common.errors.AuthenticationException
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.AbstractRequest
 import org.apache.kafka.common.utils.Utils
 import org.easymock.EasyMock
+import org.junit.{Assert, Test}
 
 import scala.collection.mutable
 
@@ -35,18 +36,20 @@ class InterBrokerSendThreadTest {
   @Test
   def shouldNotSendAnythingWhenNoRequests(): Unit = {
     val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+      override val unsentExpiryMs: Int = 1000
       override def generateRequests() = mutable.Iterable.empty
     }
 
     // poll is always called but there should be no further invocations on NetworkClient
     EasyMock.expect(networkClient.poll(EasyMock.anyLong(), EasyMock.anyLong()))
-    .andReturn(Utils.mkList())
+      .andReturn(Utils.mkList())
 
     EasyMock.replay(networkClient)
 
     sendThread.doWork()
 
     EasyMock.verify(networkClient)
+    Assert.assertFalse(completionHandler.executedWithDisconnectedResponse)
   }
 
   @Test
@@ -55,6 +58,7 @@ class InterBrokerSendThreadTest {
     val node = new Node(1, "", 8080)
     val handler = RequestAndCompletionHandler(node, request, completionHandler)
     val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+      override val unsentExpiryMs: Int = 1000
       override def generateRequests() = List[RequestAndCompletionHandler](handler)
     }
 
@@ -65,10 +69,10 @@ class InterBrokerSendThreadTest {
       EasyMock.anyLong(),
       EasyMock.eq(true),
       EasyMock.same(handler.handler)))
-    .andReturn(clientRequest)
+      .andReturn(clientRequest)
 
     EasyMock.expect(networkClient.ready(node, time.milliseconds()))
-    .andReturn(true)
+      .andReturn(true)
 
     EasyMock.expect(networkClient.send(clientRequest, time.milliseconds()))
 
@@ -80,15 +84,16 @@ class InterBrokerSendThreadTest {
     sendThread.doWork()
 
     EasyMock.verify(networkClient)
+    Assert.assertFalse(completionHandler.executedWithDisconnectedResponse)
   }
 
-
   @Test
   def shouldCallCompletionHandlerWithDisconnectedResponseWhenNodeNotReady(): Unit = {
     val request = new StubRequestBuilder
     val node = new Node(1, "", 8080)
     val requestAndCompletionHandler = RequestAndCompletionHandler(node, request, completionHandler)
     val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+      override val unsentExpiryMs: Int = 1000
       override def generateRequests() = List[RequestAndCompletionHandler](requestAndCompletionHandler)
     }
 
@@ -105,17 +110,66 @@ class InterBrokerSendThreadTest {
       .andReturn(false)
 
     EasyMock.expect(networkClient.connectionDelay(EasyMock.anyObject(), EasyMock.anyLong()))
-    .andReturn(0)
+      .andReturn(0)
 
     EasyMock.expect(networkClient.poll(EasyMock.anyLong(), EasyMock.anyLong()))
       .andReturn(Utils.mkList())
 
+    EasyMock.expect(networkClient.connectionFailed(node))
+      .andReturn(true)
+
+    EasyMock.expect(networkClient.authenticationException(node))
+      .andReturn(new AuthenticationException(""))
+
+    EasyMock.replay(networkClient)
+
+    sendThread.doWork()
+
+    EasyMock.verify(networkClient)
+    Assert.assertTrue(completionHandler.executedWithDisconnectedResponse)
+  }
+
+  @Test
+  def testFailingExpiredRequests(): Unit = {
+    val request = new StubRequestBuilder()
+    val node = new Node(1, "", 8080)
+    val handler = RequestAndCompletionHandler(node, request, completionHandler)
+    val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+      override val unsentExpiryMs: Int = 1000
+      override def generateRequests() = List[RequestAndCompletionHandler](handler)
+    }
+
+    val clientRequest = new ClientRequest("dest", request, 0, "1", time.milliseconds(), true,
handler.handler)
+    time.sleep(1500)
+
+    EasyMock.expect(networkClient.newClientRequest(EasyMock.eq("1"),
+      EasyMock.same(handler.request),
+      EasyMock.eq(time.milliseconds()),
+      EasyMock.eq(true),
+      EasyMock.same(handler.handler)))
+      .andReturn(clientRequest)
+
+    // make the node unready so the request is not cleared
+    EasyMock.expect(networkClient.ready(node, time.milliseconds()))
+      .andReturn(false)
+
+    EasyMock.expect(networkClient.connectionDelay(EasyMock.anyObject(), EasyMock.anyLong()))
+      .andReturn(0)
+
+    EasyMock.expect(networkClient.poll(EasyMock.anyLong(), EasyMock.anyLong()))
+      .andReturn(Utils.mkList())
+
+    // rule out disconnects so the request stays for the expiry check
+    EasyMock.expect(networkClient.connectionFailed(node))
+      .andReturn(false)
+
     EasyMock.replay(networkClient)
 
     sendThread.doWork()
 
     EasyMock.verify(networkClient)
-    Assert.assertTrue(completionHandler.response.wasDisconnected())
+    Assert.assertFalse(sendThread.hasUnsentRequests)
+    Assert.assertTrue(completionHandler.executedWithDisconnectedResponse)
   }
 
 
@@ -124,8 +178,10 @@ class InterBrokerSendThreadTest {
   }
 
   private class StubCompletionHandler extends RequestCompletionHandler {
+    var executedWithDisconnectedResponse = false
     var response: ClientResponse = _
     override def onComplete(response: ClientResponse): Unit = {
+      this.executedWithDisconnectedResponse = response.wasDisconnected()
       this.response = response
     }
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Windows: Consumers not polling when isolation.level=read_committed 
> -------------------------------------------------------------------
>
>                 Key: KAFKA-6052
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6052
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, producer 
>    Affects Versions: 0.11.0.0, 1.0.1
>         Environment: Windows 10. All processes running in embedded mode.
>            Reporter: Ansel Zandegran
>            Assignee: Vahid Hashemian
>            Priority: Major
>              Labels: transactions, windows
>         Attachments: Prducer_Consumer.log, Separate_Logs.zip, kafka-logs.zip, logFile.log
>
>
> *The same code is running fine in Linux.* I am trying to send a transactional record
with exactly once schematics. These are my producer, consumer and broker setups. 
> public void sendWithTTemp(String topic, EHEvent event) {
>     Properties props = new Properties();
>     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>               "localhost:9092,localhost:9093,localhost:9094");
> //    props.put("bootstrap.servers", "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092");
>     props.put(ProducerConfig.ACKS_CONFIG, "all");
>     props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
>     props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
>     props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
>     props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
>     props.put("transactional.id", "TID" + transactionId.incrementAndGet());
>     props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000");
>     Producer<String, String> producer =
>         new KafkaProducer<>(props,
>                             new StringSerializer(),
>                             new StringSerializer());
>     Logger.log(this, "Initializing transaction...");
>     producer.initTransactions();
>     Logger.log(this, "Initializing done.");
>     try {
>       Logger.log(this, "Begin transaction...");
>       producer.beginTransaction();
>       Logger.log(this, "Begin transaction done.");
>       Logger.log(this, "Sending events...");
>       producer.send(new ProducerRecord<>(topic,
>                                          event.getKey().toString(),
>                                          event.getValue().toString()));
>       Logger.log(this, "Sending events done.");
>       Logger.log(this, "Committing...");
>       producer.commitTransaction();
>       Logger.log(this, "Committing done.");
>     } catch (ProducerFencedException | OutOfOrderSequenceException
>         | AuthorizationException e) {
>       producer.close();
>       e.printStackTrace();
>     } catch (KafkaException e) {
>       producer.abortTransaction();
>       e.printStackTrace();
>     }
>     producer.close();
>   }
> *In Consumer*
> I have set isolation.level=read_committed
> *In 3 Brokers*
> I'm running with the following properties
>       Properties props = new Properties();
>       props.setProperty("broker.id", "" + i);
>       props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i));
>       props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + i);
>       props.setProperty("num.partitions", "1");
>       props.setProperty("zookeeper.connect", "localhost:2181");
>       props.setProperty("zookeeper.connection.timeout.ms", "6000");
>       props.setProperty("min.insync.replicas", "2");
>       props.setProperty("offsets.topic.replication.factor", "2");
>       props.setProperty("offsets.topic.num.partitions", "1");
>       props.setProperty("transaction.state.log.num.partitions", "2");
>       props.setProperty("transaction.state.log.replication.factor", "2");
>       props.setProperty("transaction.state.log.min.isr", "2");
> I am not getting any records in the consumer. When I set isolation.level=read_uncommitted,
I get the records. I assume that the records are not getting commited. What could be the problem?
log attached



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message