kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject git commit: KAFKA-1261 Make it possible to configure the metadata refresh.
Date Thu, 13 Feb 2014 18:54:27 GMT
Updated Branches:
  refs/heads/trunk 84a5803a7 -> 7e154a36f


KAFKA-1261 Make it possible to configure the metadata refresh.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7e154a36
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7e154a36
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7e154a36

Branch: refs/heads/trunk
Commit: 7e154a36f74ad0ea7e0f6d48b71a5a73d99330e7
Parents: 84a5803
Author: Jay Kreps <jay.kreps@gmail.com>
Authored: Wed Feb 12 15:30:08 2014 -0800
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Thu Feb 13 10:54:07 2014 -0800

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   | 29 +++++++--------
 .../kafka/clients/producer/ProducerConfig.java  | 38 ++++++++++++--------
 2 files changed, 36 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7e154a36/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 4b2f556..3d180e8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.clients.producer;
 
@@ -49,7 +45,6 @@ import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.SystemTime;
 
-
 /**
  * A Kafka client that publishes records to the Kafka cluster.
  * <P>
@@ -94,7 +89,8 @@ public class KafkaProducer implements Producer {
                                    new SystemTime());
         this.partitioner = new Partitioner();
         this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
-        this.metadata = new Metadata();
+        this.metadata = new Metadata(config.getLong(ProducerConfig.METADATA_FETCH_BACKOFF_CONFIG),
+                                     config.getLong(ProducerConfig.METADATA_EXPIRY_CONFIG));
         this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
         this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG);
         this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG),
@@ -256,6 +252,7 @@ public class KafkaProducer implements Producer {
      */
     @Override
     public void close() {
+        this.accumulator.close();
         this.sender.initiateClose();
         try {
             this.ioThread.join();

http://git-wip-us.apache.org/repos/asf/kafka/blob/7e154a36/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 502af5c..dca9802 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.clients.producer;
 
@@ -25,7 +21,6 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Type;
 
-
 /**
  * The producer configuration keys
  */
@@ -48,6 +43,17 @@ public class ProducerConfig extends AbstractConfig {
     public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
 
     /**
+     * The minimum amount of time between metadata fetches. This prevents polling for metadata
too quickly.
+     */
+    public static final String METADATA_FETCH_BACKOFF_CONFIG = "metadata.fetch.backoff.ms";
+
+    /**
+     * The period of time in milliseconds after which we force a refresh of metadata even
if we haven't seen any
+     * leadership changes.
+     */
+    public static final String METADATA_EXPIRY_CONFIG = "metadata.expiry.ms";
+
+    /**
      * The buffer size allocated for a partition. When records are received which are smaller
than this size the
      * producer will attempt to optimistically group them together until this size is reached.
      */
@@ -125,6 +131,8 @@ public class ProducerConfig extends AbstractConfig {
         /* TODO: add docs */
         config = new ConfigDef().define(BROKER_LIST_CONFIG, Type.LIST, "blah blah")
                                 .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000,
atLeast(0), "blah blah")
+                                .define(METADATA_FETCH_BACKOFF_CONFIG, Type.LONG, 50, atLeast(0),
"blah blah")
+                                .define(METADATA_EXPIRY_CONFIG, Type.LONG, 5 * 60 * 1000,
atLeast(0), "blah blah")
                                 .define(MAX_PARTITION_SIZE_CONFIG, Type.INT, 16384, atLeast(0),
"blah blah")
                                 .define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024
* 1024L, atLeast(0L), "blah blah")
                                 /* TODO: should be a string to handle acks=in-sync */


Mime
View raw message