camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject camel git commit: CAMEL-10213: Add maxPollRecords option. Thanks to Oliver Holzmann for reporting.
Date Mon, 01 Aug 2016 14:27:33 GMT
Repository: camel
Updated Branches:
  refs/heads/master a3551e3f1 -> dc7770149


CAMEL-10213: Add maxPollRecords option. Thanks to Oliver Holzmann for reporting.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dc777014
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dc777014
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dc777014

Branch: refs/heads/master
Commit: dc77701493d993621ec20c835ddeba62cfaff9fe
Parents: a3551e3
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Mon Aug 1 16:27:01 2016 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Mon Aug 1 16:27:01 2016 +0200

----------------------------------------------------------------------
 components/camel-kafka/src/main/docs/kafka.adoc     |  9 +++++++--
 .../camel/component/kafka/KafkaConfiguration.java   | 16 ++++++++++++++++
 .../apache/camel/component/kafka/KafkaEndpoint.java | 11 ++++++++++-
 3 files changed, 33 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/dc777014/components/camel-kafka/src/main/docs/kafka.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka.adoc b/components/camel-kafka/src/main/docs/kafka.adoc
index fde33e3..d8753d2 100644
--- a/components/camel-kafka/src/main/docs/kafka.adoc
+++ b/components/camel-kafka/src/main/docs/kafka.adoc
@@ -91,15 +91,16 @@ The Kafka component supports 1 options which are listed below.
 
 
 
+
+
 // endpoint options: START
-The Kafka component supports 75 endpoint options which are listed below:
+The Kafka component supports 76 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
 |=======================================================================
 | Name | Group | Default | Java Type | Description
 | brokers | common |  | String | *Required* This is for bootstrapping and the producer will
only use it for getting metadata (topics partitions and replicas). The socket connections
for sending the actual data will be established based on the broker information returned in
the metadata. The format is host1:port1host2:port2 and the list can be a subset of brokers
or a VIP pointing to a subset of brokers. This option is known as metadata.broker.list in
the Kafka documentation.
-| bridgeEndpoint | common | false | boolean | If the option is true then KafkaProducer will
ignore the KafkaConstants.TOPIC header setting of the inbound message.
 | clientId | common |  | String | The client id is a user-specified string sent in each request
to help trace calls. It should logically identify the application making the request.
 | groupId | common |  | String | A string that uniquely identifies the group of consumer
processes to which this consumer belongs. By setting the same group id multiple processes
indicate that they are all part of the same consumer group.
 | kerberosBeforeReloginMinTime | common | 60000 | Integer | Login thread sleep time between
refresh attempts.
@@ -123,12 +124,14 @@ The Kafka component supports 75 endpoint options which are listed below:
 | heartbeatIntervalMs | consumer | 3000 | Integer | The expected time between heartbeats
to the consumer coordinator when using Kafka's group management facilities. Heartbeats are
used to ensure that the consumer's session stays active and to facilitate rebalancing when
new consumers join or leave the group. The value must be set lower than session.timeout.ms
but typically should be set no higher than 1/3 of that value. It can be adjusted even lower
to control the expected time for normal rebalances.
 | keyDeserializer | consumer | org.apache.kafka.common.serialization.StringDeserializer |
String | Deserializer class for key that implements the Deserializer interface.
 | maxPartitionFetchBytes | consumer | 1048576 | Integer | The maximum amount of data per-partition
the server will return. The maximum total memory used for a request will be partitions max.partition.fetch.bytes.
This size must be at least as large as the maximum message size the server allows or else
it is possible for the producer to send messages larger than the consumer can fetch. If that
happens the consumer can get stuck trying to fetch a large message on a certain partition.
+| maxPollRecords | consumer | 2147483647 | Integer | A unique string that identifies the
consumer group this consumer belongs to. This property is required if the consumer uses either
the group management functionality by using subscribe(topic) or the Kafka-based offset management
strategy.
 | partitionAssignor | consumer | org.apache.kafka.clients.consumer.RangeAssignor | String
| The class name of the partition assignment strategy that the client will use to distribute
partition ownership amongst consumer instances when group management is used
 | pollTimeoutMs | consumer | 5000 | Long | The timeout used when polling the KafkaConsumer.
 | seekToBeginning | consumer | false | boolean | If the option is true then KafkaConsumer
will read from beginning on startup.
 | sessionTimeoutMs | consumer | 30000 | Integer | The timeout used to detect failures when
using Kafka's group management facilities.
 | valueDeserializer | consumer | org.apache.kafka.common.serialization.StringDeserializer
| String | Deserializer class for value that implements the Deserializer interface.
 | exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the consumer use
a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options
is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR
level and ignored.
+| bridgeEndpoint | producer | false | boolean | If the option is true then KafkaProducer
will ignore the KafkaConstants.TOPIC header setting of the inbound message.
 | bufferMemorySize | producer | 33554432 | Integer | The total bytes of memory the producer
can use to buffer records waiting to be sent to the server. If records are sent faster than
they can be delivered to the server the producer will either block or throw an exception based
on the preference specified by block.on.buffer.full.This setting should correspond roughly
to the total memory the producer will use but is not a hard bound since not all memory the
producer uses is used for buffering. Some additional memory will be used for compression (if
compression is enabled) as well as for maintaining in-flight requests.
 | compressionCodec | producer | none | String | This parameter allows you to specify the
compression codec for all data generated by this producer. Valid values are none gzip and
snappy.
 | connectionMaxIdleMs | producer | 540000 | Integer | Close idle connections after the number
of milliseconds specified by this config.
@@ -190,6 +193,8 @@ The Kafka component supports 75 endpoint options which are listed below:
 
 
 
+
+
 For more information about Producer/Consumer configuration:
 
 http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs]

http://git-wip-us.apache.org/repos/asf/camel/blob/dc777014/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index a530795..deb12a1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -71,6 +71,8 @@ public class KafkaConfiguration {
     //session.timeout.ms
     @UriParam(label = "consumer", defaultValue = "30000")
     private Integer sessionTimeoutMs = 30000;
+    @UriParam(label = "consumer", defaultValue = "2147483647")
+    private Integer maxPollRecords;
     @UriParam(label = "consumer", defaultValue = "5000")
     private Long pollTimeoutMs = 5000L;
     //auto.offset.reset1
@@ -310,6 +312,7 @@ public class KafkaConfiguration {
         addPropertyIfNotNull(props, ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, getHeartbeatIntervalMs());
         addPropertyIfNotNull(props, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, getMaxPartitionFetchBytes());
         addPropertyIfNotNull(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs());
+        addPropertyIfNotNull(props, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, getMaxPollRecords());
         // SSL
         addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
         addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
@@ -1091,6 +1094,19 @@ public class KafkaConfiguration {
         this.sessionTimeoutMs = sessionTimeoutMs;
     }
 
+    public Integer getMaxPollRecords() {
+        return maxPollRecords;
+    }
+
+    /**
+     * A unique string that identifies the consumer group this consumer belongs to.
+     * This property is required if the consumer uses either the group management functionality
by using
+     * <code>subscribe(topic)</code> or the Kafka-based offset management strategy.
+     */
+    public void setMaxPollRecords(Integer maxPollRecords) {
+        this.maxPollRecords = maxPollRecords;
+    }
+
     public Long getPollTimeoutMs() {
         return pollTimeoutMs;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/dc777014/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index e918bfd..ff0028e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -51,7 +51,7 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
     
     @UriParam
     private KafkaConfiguration configuration = new KafkaConfiguration();
-    @UriParam
+    @UriParam(label = "producer")
     private boolean bridgeEndpoint;
 
     public KafkaEndpoint() {
@@ -744,4 +744,13 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
     public void setWorkerPoolCoreSize(Integer workerPoolCoreSize) {
         configuration.setWorkerPoolCoreSize(workerPoolCoreSize);
     }
+
+    public Integer getMaxPollRecords() {
+        return configuration.getMaxPollRecords();
+    }
+
+    public void setMaxPollRecords(Integer maxPollRecords) {
+        configuration.setMaxPollRecords(maxPollRecords);
+    }
+
 }


Mime
View raw message