kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject [1/2] kafka git commit: KAFKA-2120; Add a request timeout to NetworkClient (KIP-19); reviewed by Jason Gustafson, Ismael Juma, Joel Koshy, Jun Rao, and Edward Ribeiro
Date Tue, 29 Sep 2015 17:37:17 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4e7db3955 -> 2c4e63a89


http://git-wip-us.apache.org/repos/asf/kafka/blob/2c4e63a8/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 5b2e4ff..dcc52b6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -40,6 +40,8 @@ import org.apache.kafka.common.record.LogEntry;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
 import org.junit.Test;
 
 public class RecordAccumulatorTest {
@@ -63,17 +65,18 @@ public class RecordAccumulatorTest {
     private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1,
part2, part3));
     private Metrics metrics = new Metrics(time);
     Map<String, String> metricTags = new LinkedHashMap<String, String>();
+    private final long maxBlockTimeMs = 1000;
 
     @Test
     public void testFull() throws Exception {
         long now = time.milliseconds();
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
10L, 100L, false, metrics, time,  metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
10L, 100L, metrics, time,  metricTags);
         int appends = 1024 / msgSize;
         for (int i = 0; i < appends; i++) {
-            accum.append(tp1, key, value, null);
+            accum.append(tp1, key, value, null, maxBlockTimeMs);
             assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
         }
-        accum.append(tp1, key, value, null);
+        accum.append(tp1, key, value, null, maxBlockTimeMs);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1),
accum.ready(cluster, time.milliseconds()).readyNodes);
         List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1),
Integer.MAX_VALUE, 0).get(node1.id());
         assertEquals(1, batches.size());
@@ -91,16 +94,16 @@ public class RecordAccumulatorTest {
     @Test
     public void testAppendLarge() throws Exception {
         int batchSize = 512;
-        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE,
0L, 100L, false, metrics, time, metricTags);
-        accum.append(tp1, key, new byte[2 * batchSize], null);
+        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE,
0L, 100L, metrics, time, metricTags);
+        accum.append(tp1, key, new byte[2 * batchSize], null, maxBlockTimeMs);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1),
accum.ready(cluster, time.milliseconds()).readyNodes);
     }
 
     @Test
     public void testLinger() throws Exception {
         long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
lingerMs, 100L, false, metrics, time, metricTags);
-        accum.append(tp1, key, value, null);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
lingerMs, 100L, metrics, time, metricTags);
+        accum.append(tp1, key, value, null, maxBlockTimeMs);
         assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
         time.sleep(10);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1),
accum.ready(cluster, time.milliseconds()).readyNodes);
@@ -117,12 +120,12 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testPartialDrain() throws Exception {
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
10L, 100L, false, metrics, time, metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
10L, 100L, metrics, time, metricTags);
         int appends = 1024 / msgSize + 1;
         List<TopicPartition> partitions = asList(tp1, tp2);
         for (TopicPartition tp : partitions) {
             for (int i = 0; i < appends; i++)
-                accum.append(tp, key, value, null);
+                accum.append(tp, key, value, null, maxBlockTimeMs);
         }
         assertEquals("Partition's leader should be ready", Collections.singleton(node1),
accum.ready(cluster, time.milliseconds()).readyNodes);
 
@@ -136,14 +139,14 @@ public class RecordAccumulatorTest {
         final int numThreads = 5;
         final int msgs = 10000;
         final int numParts = 2;
-        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
0L, 100L, true, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
0L, 100L, metrics, time, metricTags);
         List<Thread> threads = new ArrayList<Thread>();
         for (int i = 0; i < numThreads; i++) {
             threads.add(new Thread() {
                 public void run() {
                     for (int i = 0; i < msgs; i++) {
                         try {
-                            accum.append(new TopicPartition(topic, i % numParts), key, value,
null);
+                            accum.append(new TopicPartition(topic, i % numParts), key, value,
null, maxBlockTimeMs);
                         } catch (Exception e) {
                             e.printStackTrace();
                         }
@@ -177,13 +180,13 @@ public class RecordAccumulatorTest {
     public void testNextReadyCheckDelay() throws Exception {
         // Next check time will use lingerMs since this test won't trigger any retries/backoff
         long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024,  CompressionType.NONE,
lingerMs, 100L, false, metrics, time, metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024,  CompressionType.NONE,
lingerMs, 100L, metrics, time, metricTags);
         // Just short of going over the limit so we trigger linger time
         int appends = 1024 / msgSize;
 
         // Partition on node1 only
         for (int i = 0; i < appends; i++)
-            accum.append(tp1, key, value, null);
+            accum.append(tp1, key, value, null, maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs);
@@ -192,14 +195,14 @@ public class RecordAccumulatorTest {
 
         // Add partition on node2 only
         for (int i = 0; i < appends; i++)
-            accum.append(tp3, key, value, null);
+            accum.append(tp3, key, value, null, maxBlockTimeMs);
         result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         assertEquals("Next check time should be defined by node1, half remaining linger time",
lingerMs / 2, result.nextReadyCheckDelayMs);
 
         // Add data for another partition on node1, enough to make data sendable immediately
         for (int i = 0; i < appends + 1; i++)
-            accum.append(tp2, key, value, null);
+            accum.append(tp2, key, value, null, maxBlockTimeMs);
         result = accum.ready(cluster, time.milliseconds());
         assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
         // Note this can actually be < linger time because it may use delays from partitions
that aren't sendable
@@ -211,10 +214,10 @@ public class RecordAccumulatorTest {
     public void testRetryBackoff() throws Exception {
         long lingerMs = Long.MAX_VALUE / 4;
         long retryBackoffMs = Long.MAX_VALUE / 2;
-        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
lingerMs, retryBackoffMs, false, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
lingerMs, retryBackoffMs, metrics, time, metricTags);
 
         long now = time.milliseconds();
-        accum.append(tp1, key, value, null);
+        accum.append(tp1, key, value, null, maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs +
1);
         assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
         Map<Integer, List<RecordBatch>> batches = accum.drain(cluster, result.readyNodes,
Integer.MAX_VALUE, now + lingerMs + 1);
@@ -226,7 +229,7 @@ public class RecordAccumulatorTest {
         accum.reenqueue(batches.get(0).get(0), now);
 
         // Put message for partition 1 into accumulator
-        accum.append(tp2, key, value, null);
+        accum.append(tp2, key, value, null, maxBlockTimeMs);
         result = accum.ready(cluster, now + lingerMs + 1);
         assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
 
@@ -248,9 +251,9 @@ public class RecordAccumulatorTest {
     @Test
     public void testFlush() throws Exception {
         long lingerMs = Long.MAX_VALUE;
-        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE,
lingerMs, 100L, false, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE,
lingerMs, 100L, metrics, time, metricTags);
         for (int i = 0; i < 100; i++)
-            accum.append(new TopicPartition(topic, i % 3), key, value, null);
+            accum.append(new TopicPartition(topic, i % 3), key, value, null, maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         
@@ -272,7 +275,7 @@ public class RecordAccumulatorTest {
     public void testAbortIncompleteBatches() throws Exception {
         long lingerMs = Long.MAX_VALUE;
         final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
-        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE,
lingerMs, 100L, false, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE,
lingerMs, 100L, metrics, time, metricTags);
         class TestCallback implements Callback {
             @Override
             public void onCompletion(RecordMetadata metadata, Exception exception) {
@@ -281,7 +284,7 @@ public class RecordAccumulatorTest {
             }
         }
         for (int i = 0; i < 100; i++)
-            accum.append(new TopicPartition(topic, i % 3), key, value, new TestCallback());
+            accum.append(new TopicPartition(topic, i % 3), key, value, new TestCallback(),
maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
 
@@ -291,4 +294,24 @@ public class RecordAccumulatorTest {
 
     }
 
+    @Test
+    public void testExpiredBatches() throws InterruptedException {
+        Time time = new SystemTime();
+        long now = time.milliseconds();
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
10, 100L, metrics, time, metricTags);
+        int appends = 1024 / msgSize;
+        for (int i = 0; i < appends; i++) {
+            accum.append(tp1, key, value, null, maxBlockTimeMs);
+            assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
+        }
+        time.sleep(2000);
+        accum.ready(cluster, now);
+        accum.append(tp1, key, value, null, 0);
+        Set<Node> readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+        assertEquals("Our partition's leader should be ready", Collections.singleton(node1),
readyNodes);
+        Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>());
+        now = time.milliseconds();
+        List<RecordBatch> expiredBatches = accum.abortExpiredBatches(60, cluster, now);
+        assertEquals(1, expiredBatches.size());
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c4e63a8/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index aa44991..bcf6a3a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -44,10 +44,11 @@ public class SenderTest {
     private static final int MAX_REQUEST_SIZE = 1024 * 1024;
     private static final short ACKS_ALL = -1;
     private static final int MAX_RETRIES = 0;
-    private static final int REQUEST_TIMEOUT_MS = 10000;
     private static final String CLIENT_ID = "clientId";
     private static final String METRIC_GROUP = "producer-metrics";
     private static final double EPS = 0.0001;
+    private static final int MAX_BLOCK_TIMEOUT = 1000;
+    private static final int REQUEST_TIMEOUT = 1000;
 
     private TopicPartition tp = new TopicPartition("test", 0);
     private MockTime time = new MockTime();
@@ -57,17 +58,17 @@ public class SenderTest {
     private Cluster cluster = TestUtils.singletonCluster("test", 1);
     private Metrics metrics = new Metrics(time);
     Map<String, String> metricTags = new LinkedHashMap<String, String>();
-    private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024,
CompressionType.NONE, 0L, 0L, false, metrics, time, metricTags);
+    private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024,
CompressionType.NONE, 0L, 0L, metrics, time, metricTags);
     private Sender sender = new Sender(client,
                                        metadata,
                                        this.accumulator,
                                        MAX_REQUEST_SIZE,
                                        ACKS_ALL,
                                        MAX_RETRIES,
-                                       REQUEST_TIMEOUT_MS,
                                        metrics,
                                        time,
-                                       CLIENT_ID);
+                                       CLIENT_ID,
+                                       REQUEST_TIMEOUT);
 
     @Before
     public void setup() {
@@ -78,7 +79,7 @@ public class SenderTest {
     @Test
     public void testSimple() throws Exception {
         long offset = 0;
-        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(),
null).future;
+        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(),
null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
@@ -97,7 +98,7 @@ public class SenderTest {
     public void testQuotaMetrics() throws Exception {
         final long offset = 0;
         for (int i = 1; i <= 3; i++) {
-            Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(),
"value".getBytes(), null).future;
+            Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(),
"value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // send produce request
             client.respond(produceResponse(tp, offset, Errors.NONE.code(), 100 * i));
             sender.run(time.milliseconds());
@@ -119,12 +120,12 @@ public class SenderTest {
                                    MAX_REQUEST_SIZE,
                                    ACKS_ALL,
                                    maxRetries,
-                                   REQUEST_TIMEOUT_MS,
                                    new Metrics(),
                                    time,
-                                   "clientId");
+                                   "clientId",
+                                   REQUEST_TIMEOUT);
         // do a successful retry
-        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(),
null).future;
+        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(),
null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals(1, client.inFlightRequestCount());
@@ -141,7 +142,7 @@ public class SenderTest {
         assertEquals(offset, future.get().offset());
 
         // do an unsuccessful retry
-        future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future;
+        future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds()); // send produce request
         for (int i = 0; i < maxRetries + 1; i++) {
             client.disconnect(client.requests().peek().request().destination());

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c4e63a8/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
index df1205c..5056c71 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
@@ -13,7 +13,6 @@
 package org.apache.kafka.common.network;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -101,24 +100,6 @@ public class SSLSelectorTest {
         assertEquals("hello", blockingRequest(node, "hello"));
     }
 
-
-    /**
-     * Validate that the client can intentionally disconnect and reconnect
-     */
-    @Test
-    public void testClientDisconnect() throws Exception {
-        String node = "0";
-        blockingConnect(node);
-        selector.disconnect(node);
-        selector.send(createSend(node, "hello1"));
-        selector.poll(10L);
-        assertEquals("Request should not have succeeded", 0, selector.completedSends().size());
-        assertEquals("There should be a disconnect", 1, selector.disconnected().size());
-        assertTrue("The disconnect should be from our node", selector.disconnected().contains(node));
-        blockingConnect(node);
-        assertEquals("hello2", blockingRequest(node, "hello2"));
-    }
-
      /**
      * Tests wrap BUFFER_OVERFLOW  and unwrap BUFFER_UNDERFLOW
      * @throws Exception

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c4e63a8/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 3a684d9..66ca530 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -85,23 +85,6 @@ public class SelectorTest {
     }
 
     /**
-     * Validate that the client can intentionally disconnect and reconnect
-     */
-    @Test
-    public void testClientDisconnect() throws Exception {
-        String node = "0";
-        blockingConnect(node);
-        selector.disconnect(node);
-        selector.send(createSend(node, "hello1"));
-        selector.poll(10);
-        assertEquals("Request should not have succeeded", 0, selector.completedSends().size());
-        assertEquals("There should be a disconnect", 1, selector.disconnected().size());
-        assertTrue("The disconnect should be from our node", selector.disconnected().contains(node));
-        blockingConnect(node);
-        assertEquals("hello2", blockingRequest(node, "hello2"));
-    }
-
-    /**
      * Sending a request with one already in flight should result in an exception
      */
     @Test(expected = IllegalStateException.class)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c4e63a8/clients/src/test/java/org/apache/kafka/test/MockSelector.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index f83fd9b..5a5f963 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -45,11 +45,6 @@ public class MockSelector implements Selectable {
     }
 
     @Override
-    public void disconnect(String id) {
-        this.disconnected.add(id);
-    }
-
-    @Override
     public void wakeup() {
     }
 
@@ -59,6 +54,7 @@ public class MockSelector implements Selectable {
 
     @Override
     public void close(String id) {
+        this.disconnected.add(id);
     }
 
     public void clear() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c4e63a8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 1220c38..64e3355 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -106,7 +106,8 @@ class ControllerChannelManager(controllerContext: ControllerContext, config:
Kaf
         1,
         0,
         Selectable.USE_DEFAULT_BUFFER_SIZE,
-        Selectable.USE_DEFAULT_BUFFER_SIZE
+        Selectable.USE_DEFAULT_BUFFER_SIZE,
+        config.requestTimeoutMs
       )
     }
     val threadName = threadNamePrefix match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c4e63a8/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 1c594f6..46f4a25 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -64,6 +64,7 @@ object Defaults {
   val MaxConnectionsPerIp: Int = Int.MaxValue
   val MaxConnectionsPerIpOverrides: String = ""
   val ConnectionsMaxIdleMs = 10 * 60 * 1000L
+  val RequestTimeoutMs = 30000
 
   /** ********* Log Configuration ***********/
   val NumPartitions = 1
@@ -99,7 +100,7 @@ object Defaults {
   val MinInSyncReplicas = 1
 
   /** ********* Replication configuration ***********/
-  val ControllerSocketTimeoutMs = 30000
+  val ControllerSocketTimeoutMs = RequestTimeoutMs
   val ControllerMessageQueueSize = Int.MaxValue
   val DefaultReplicationFactor = 1
   val ReplicaLagTimeMaxMs = 10000L
@@ -197,6 +198,7 @@ object KafkaConfig {
   val NumIoThreadsProp = "num.io.threads"
   val BackgroundThreadsProp = "background.threads"
   val QueuedMaxRequestsProp = "queued.max.requests"
+  val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameProp = "authorizer.class.name"
   /** ********* Socket Server Configuration ***********/
@@ -340,6 +342,7 @@ object KafkaConfig {
   val NumIoThreadsDoc = "The number of io threads that the server uses for carrying out network
requests"
   val BackgroundThreadsDoc = "The number of threads to use for various background processing
tasks"
   val QueuedMaxRequestsDoc = "The number of queued requests allowed before blocking the network
threads"
+  val RequestTimeoutMsDoc = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameDoc = "The authorizer class that should be used for authorization"
   /** ********* Socket Server Configuration ***********/
@@ -515,6 +518,7 @@ object KafkaConfig {
       .define(NumIoThreadsProp, INT, Defaults.NumIoThreads, atLeast(1), HIGH, NumIoThreadsDoc)
       .define(BackgroundThreadsProp, INT, Defaults.BackgroundThreads, atLeast(1), HIGH, BackgroundThreadsDoc)
       .define(QueuedMaxRequestsProp, INT, Defaults.QueuedMaxRequests, atLeast(1), HIGH, QueuedMaxRequestsDoc)
+      .define(RequestTimeoutMsProp, INT, Defaults.RequestTimeoutMs, HIGH, RequestTimeoutMsDoc)
 
       /************* Authorizer Configuration ***********/
       .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc)
@@ -693,6 +697,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
   val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp)
   val numIoThreads = getInt(KafkaConfig.NumIoThreadsProp)
   val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp)
+  val requestTimeoutMs = getInt(KafkaConfig.RequestTimeoutMsProp)
 
   /************* Authorizer Configuration ***********/
   val authorizerClassName: String = getString(KafkaConfig.AuthorizerClassNameProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c4e63a8/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0a60efb..e2c8f48 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -313,7 +313,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
           1,
           0,
           Selectable.USE_DEFAULT_BUFFER_SIZE,
-          Selectable.USE_DEFAULT_BUFFER_SIZE)
+          Selectable.USE_DEFAULT_BUFFER_SIZE,
+          config.requestTimeoutMs)
       }
 
       var shutdownSucceeded: Boolean = false

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c4e63a8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 12698c7..82a6001 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -83,7 +83,8 @@ class ReplicaFetcherThread(name: String,
       1,
       0,
       Selectable.USE_DEFAULT_BUFFER_SIZE,
-      brokerConfig.replicaSocketReceiveBufferBytes
+      brokerConfig.replicaSocketReceiveBufferBytes,
+      brokerConfig.requestTimeoutMs
     )
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c4e63a8/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index 46a68e9..e299f8b 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -38,7 +38,6 @@ import org.apache.log4j.Logger
 object ProducerPerformance extends Logging {
 
   def main(args: Array[String]) {
-
     val logger = Logger.getLogger(getClass)
     val config = new ProducerPerfConfig(args)
     if (!config.isFixedSize)
@@ -195,7 +194,6 @@ object ProducerPerformance extends Logging {
         props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString)
         props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance")
         props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString)
-        props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString)
         props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString)
         props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString)
         props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c4e63a8/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
index ad10721..9ed9d29 100644
--- a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
+++ b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
@@ -75,7 +75,7 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal
{
    * care.
    */
   def blockingSendAndReceive(request: ClientRequest, timeout: Long)(implicit time: JTime):
Option[ClientResponse] = {
-    client.send(request)
+    client.send(request, time.milliseconds())
 
     pollUntilFound(timeout) { case (responses, _) =>
       val response = responses.find { response =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c4e63a8/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 1198df0..e90818a 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -144,49 +144,6 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   }
 
   /**
-   * 1. With ack=0, the future metadata should not be blocked.
-   * 2. With ack=1, the future metadata should block,
-   *    and subsequent calls will eventually cause buffer full
-   */
-  @Test
-  def testNoResponse() {
-    // create topic
-    TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
-
-    // first send a message to make sure the metadata is refreshed
-    val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes,
"value".getBytes)
-    producer1.send(record1).get
-    producer2.send(record1).get
-
-    // stop IO threads and request handling, but leave networking operational
-    // any requests should be accepted and queue up, but not handled
-    servers.foreach(server => server.requestHandlerPool.shutdown())
-
-    producer1.send(record1).get(5000, TimeUnit.MILLISECONDS)
-
-    intercept[TimeoutException] {
-      producer2.send(record1).get(5000, TimeUnit.MILLISECONDS)
-    }
-
-    // TODO: expose producer configs after creating them
-    // send enough messages to get buffer full
-    val tooManyRecords = 10
-    val msgSize = producerBufferSize / tooManyRecords
-    val value = new Array[Byte](msgSize)
-    new Random().nextBytes(value)
-    val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes,
value)
-
-    intercept[KafkaException] {
-      for (i <- 1 to tooManyRecords)
-        producer2.send(record2)
-    }
-
-    // do not close produce2 since it will block
-    // TODO: can we do better?
-    producer2 = null
-  }
-
-  /**
    *  The send call with invalid partition id should throw KafkaException caused by IllegalArgumentException
    */
   @Test
@@ -287,7 +244,8 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
     } catch {
       case e: ExecutionException =>
         if (!e.getCause.isInstanceOf[NotEnoughReplicasException]  &&
-            !e.getCause.isInstanceOf[NotEnoughReplicasAfterAppendException]) {
+            !e.getCause.isInstanceOf[NotEnoughReplicasAfterAppendException] &&
+            !e.getCause.isInstanceOf[TimeoutException]) {
           fail("Expected NotEnoughReplicasException or NotEnoughReplicasAfterAppendException
when producing to topic " +
             "with fewer brokers than min.insync.replicas, but saw " + e.getCause)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c4e63a8/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 5b4f2db..bfec426 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -404,6 +404,7 @@ class KafkaConfigTest {
         case KafkaConfig.NumIoThreadsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number", "0")
         case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number", "0")
         case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number", "0")
+        case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
 
         case KafkaConfig.AuthorizerClassNameProp => //ignore string
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c4e63a8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2343ea1..8e4c263 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -410,7 +410,6 @@ object TestUtils extends Logging {
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
     producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString)
-    producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString)
     producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
     producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
     producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")


Mime
View raw message