camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acosent...@apache.org
Subject camel git commit: CAMEL-9818: Camel kafka consumer adds legacy (deprecated properties)
Date Fri, 08 Apr 2016 13:08:12 GMT
Repository: camel
Updated Branches:
  refs/heads/master fd659c108 -> d84cc7005


CAMEL-9818: Camel kafka consumer adds legacy (deprecated properties)


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d84cc700
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d84cc700
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d84cc700

Branch: refs/heads/master
Commit: d84cc70056160fdf98b286c52a9d558663d8e8e1
Parents: fd659c1
Author: Andrea Cosentino <ancosen@gmail.com>
Authored: Fri Apr 8 15:03:50 2016 +0200
Committer: Andrea Cosentino <ancosen@gmail.com>
Committed: Fri Apr 8 15:05:11 2016 +0200

----------------------------------------------------------------------
 components/camel-kafka/src/main/docs/kafka.adoc |  9 ++--
 .../component/kafka/KafkaConfiguration.java     | 54 +-------------------
 .../camel/component/kafka/KafkaEndpoint.java    | 24 ---------
 .../component/kafka/KafkaComponentTest.java     |  6 ---
 4 files changed, 5 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d84cc700/components/camel-kafka/src/main/docs/kafka.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka.adoc b/components/camel-kafka/src/main/docs/kafka.adoc
index 7528eb9..0f23849 100644
--- a/components/camel-kafka/src/main/docs/kafka.adoc
+++ b/components/camel-kafka/src/main/docs/kafka.adoc
@@ -65,8 +65,9 @@ The Kafka component has no options.
 
 
 
+
 // endpoint options: START
-The Kafka component supports 74 endpoint options which are listed below:
+The Kafka component supports 71 endpoint options which are listed below:
 
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
 |=======================================================================
@@ -76,7 +77,7 @@ The Kafka component supports 74 endpoint options which are listed below:
 | clientId | common |  | String | The client id is a user-specified string sent in each request
to help trace calls. It should logically identify the application making the request.
 | groupId | common |  | String | A string that uniquely identifies the group of consumer
processes to which this consumer belongs. By setting the same group id multiple processes
indicate that they are all part of the same consumer group.
 | partitioner | common | org.apache.kafka.clients.producer.internals.DefaultPartitioner |
String | The partitioner class for partitioning messages amongst sub-topics. The default partitioner
is based on the hash of the key.
-| topic | common |  | String | *Required* Name of the topic to use. When used on a consumer
endpoint the topic can be a comma separated list of topics.
+| topic | common |  | String | *Required* Name of the topic to use.
 | autoCommitEnable | consumer | true | Boolean | If true periodically commit to ZooKeeper
the offset of messages already fetched by the consumer. This committed offset will be used
when the process fails as the position from which the new consumer will begin.
 | autoCommitIntervalMs | consumer | 5000 | Integer | The frequency in ms that the consumer
offsets are committed to zookeeper.
 | autoOffsetReset | consumer | latest | String | What to do when there is no initial offset
in ZooKeeper or if an offset is out of range: smallest : automatically reset the offset to
the smallest offset largest : automatically reset the offset to the largest offset fail: throw
exception to the consumer
@@ -98,7 +99,6 @@ The Kafka component supports 74 endpoint options which are listed below:
 | sessionTimeoutMs | consumer | 30000 | Integer | The timeout used to detect failures when
using Kafka's group management facilities.
 | valueDeserializer | consumer | org.apache.kafka.common.serialization.StringDeserializer
| String | Deserializer class for value that implements the Deserializer interface.
 | exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the consumer use
a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options
is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR
level and ignored.
-| blockOnBufferFull | producer | false | Boolean | When our memory buffer is exhausted we
must either stop accepting new records (block) or throw errors. By default this setting is
true and we block however in some scenarios blocking is not desirable and it is better to
immediately give an error. Setting this to false will accomplish that: the producer will throw
a BufferExhaustedException if a recrord is sent and the buffer space is full.
 | bufferMemorySize | producer | 33554432 | Integer | The total bytes of memory the producer
can use to buffer records waiting to be sent to the server. If records are sent faster than
they can be delivered to the server the producer will either block or throw an exception based
on the preference specified by block.on.buffer.full.This setting should correspond roughly
to the total memory the producer will use but is not a hard bound since not all memory the
producer uses is used for buffering. Some additional memory will be used for compression (if
compression is enabled) as well as for maintaining in-flight requests.
 | compressionCodec | producer | none | String | This parameter allows you to specify the
compression codec for all data generated by this producer. Valid values are none gzip and
snappy.
 | connectionMaxIdleMs | producer | 540000 | Integer | Close idle connections after the number
of milliseconds specified by this config.
@@ -111,7 +111,6 @@ The Kafka component supports 74 endpoint options which are listed below:
 | maxBlockMs | producer | 60000 | Integer | The configuration controls how long sending to
kafka will block. These methods can be blocked for multiple reasons. For e.g: buffer full
metadata unavailable.This configuration imposes maximum limit on the total time spent in fetching
metadata serialization of key and value partitioning and allocation of buffer memory when
doing a send(). In case of partitionsFor() this configuration imposes a maximum time threshold
on waiting for metadata
 | maxInFlightRequest | producer | 5 | Integer | The maximum number of unacknowledged requests
the client will send on a single connection before blocking. Note that if this setting is
set to be greater than 1 and there are failed sends there is a risk of message re-ordering
due to retries (i.e. if retries are enabled).
 | maxRequestSize | producer | 1048576 | Integer | The maximum size of a request. This is
also effectively a cap on the maximum record size. Note that the server has its own cap on
record size which may be different from this. This setting will limit the number of record
batches the producer will send in a single request to avoid sending huge requests.
-| metadataFetchTimeoutMs | producer | 60000 | Integer | The first time data is sent to a
topic we must fetch metadata about that topic to know which servers host the topic's partitions.
This fetch to succeed before throwing an exception back to the client.
 | metadataMaxAgeMs | producer | 300000 | Integer | The period of time in milliseconds after
which we force a refresh of metadata even if we haven't seen any partition leadership changes
to proactively discover any new brokers or partitions.
 | metricReporters | producer |  | String | A list of classes to use as metrics reporters.
Implementing the MetricReporter interface allows plugging in classes that will be notified
of new metric creation. The JmxReporter is always included to register JMX statistics.
 | metricsSampleWindowMs | producer | 30000 | Integer | The number of samples maintained to
compute metrics.
@@ -142,13 +141,13 @@ The Kafka component supports 74 endpoint options which are listed below:
 | sslTruststoreLocation | producer |  | String | The location of the trust store file.
 | sslTruststorePassword | producer |  | String | The password for the trust store file.
 | sslTruststoreType | producer | JKS | String | The file format of the trust store file.
Default value is JKS.
-| timeoutMs | producer | 30000 | Integer | The configuration controls the maximum amount
of time the server will wait for acknowledgments from followers to meet the acknowledgment
requirements the producer has specified with the acks configuration. If the requested number
of acknowledgments are not met when the timeout elapses an error will be returned. This timeout
is measured on the server side and does not include the network latency of the request.
 | exchangePattern | advanced | InOnly | ExchangePattern | Sets the default exchange pattern
when creating an exchange
 | synchronous | advanced | false | boolean | Sets whether synchronous processing should be
strictly used or Camel is allowed to use asynchronous processing (if supported).
 |=======================================================================
 // endpoint options: END
 
 
+
 For more information about Producer/Consumer configuration:
 
 http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs]

http://git-wip-us.apache.org/repos/asf/camel/blob/d84cc700/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 49d7fe2..e0580e9 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -184,18 +184,9 @@ public class KafkaConfiguration {
     //ssl.truststore.type
     @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE)
     private String sslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE;
-    //timeout.ms
-    @UriParam(label = "producer", defaultValue = "30000")
-    private Integer timeoutMs = 30000;
-    //block.on.buffer.full
-    @UriParam(label = "producer", defaultValue = "false")
-    private Boolean blockOnBufferFull = false;
     //max.in.flight.requests.per.connection
     @UriParam(label = "producer", defaultValue = "5")
     private Integer maxInFlightRequest = 5;
-    //metadata.fetch.timeout.ms
-    @UriParam(label = "producer", defaultValue = "60000")
-    private Integer metadataFetchTimeoutMs = 600 * 1000;
     //metadata.max.age.ms
     @UriParam(label = "producer", defaultValue = "300000")
     private Integer metadataMaxAgeMs = 300000;
@@ -276,10 +267,7 @@ public class KafkaConfiguration {
         addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol());
         addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider());
         addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType());
-        addPropertyIfNotNull(props, ProducerConfig.TIMEOUT_CONFIG, getTimeoutMs());
-        addPropertyIfNotNull(props, ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, getBlockOnBufferFull());
         addPropertyIfNotNull(props, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
getMaxInFlightRequest());
-        addPropertyIfNotNull(props, ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, getMetadataFetchTimeoutMs());
         addPropertyIfNotNull(props, ProducerConfig.METADATA_MAX_AGE_CONFIG, getMetadataMaxAgeMs());
         addPropertyIfNotNull(props, ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters());
         addPropertyIfNotNull(props, ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, getNoOfMetricsSample());
@@ -389,7 +377,7 @@ public class KafkaConfiguration {
     }
 
     /**
-     * Name of the topic to use. When used on a consumer endpoint the topic can be a comma
separated list of topics.
+     * Name of the topic to use.
      */
     public void setTopic(String topic) {
         this.topic = topic;
@@ -870,20 +858,6 @@ public class KafkaConfiguration {
         this.bufferMemorySize = bufferMemorySize;
     }
 
-    public Boolean getBlockOnBufferFull() {
-        return blockOnBufferFull;
-    }
-
-    /**
-     * When our memory buffer is exhausted we must either stop accepting new records (block)
or throw errors.
-     * By default this setting is true and we block, however in some scenarios blocking is
not desirable and it
-     * is better to immediately give an error. Setting this to false will accomplish that:
the producer will throw
-     * a BufferExhaustedException if a recrord is sent and the buffer space is full.
-     */
-    public void setBlockOnBufferFull(Boolean blockOnBufferFull) {
-        this.blockOnBufferFull = blockOnBufferFull;
-    }
-
     public Integer getRequestRequiredAcks() {
         return requestRequiredAcks;
     }
@@ -1003,20 +977,6 @@ public class KafkaConfiguration {
         this.receiveBufferBytes = receiveBufferBytes;
     }
 
-    public Integer getTimeoutMs() {
-        return timeoutMs;
-    }
-
-    /**
-     * The configuration controls the maximum amount of time the server will wait for acknowledgments
from followers to meet the
-     * acknowledgment requirements the producer has specified with the acks configuration.
If the requested number of acknowledgments
-     * are not met when the timeout elapses an error will be returned. This timeout is measured
on the server side and does not include
-     * the network latency of the request.
-     */
-    public void setTimeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-    }
-
     public Integer getMaxInFlightRequest() {
         return maxInFlightRequest;
     }
@@ -1029,18 +989,6 @@ public class KafkaConfiguration {
         this.maxInFlightRequest = maxInFlightRequest;
     }
 
-    public Integer getMetadataFetchTimeoutMs() {
-        return metadataFetchTimeoutMs;
-    }
-
-    /**
-     * The first time data is sent to a topic we must fetch metadata about that topic to
know which servers host the topic's partitions.
-     * This fetch to succeed before throwing an exception back to the client.
-     */
-    public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) {
-        this.metadataFetchTimeoutMs = metadataFetchTimeoutMs;
-    }
-
     public Integer getMetadataMaxAgeMs() {
         return metadataMaxAgeMs;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d84cc700/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 327ecdc..1c239c8 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -283,10 +283,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         configuration.setBrokers(brokers);
     }
 
-    public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) {
-        configuration.setMetadataFetchTimeoutMs(metadataFetchTimeoutMs);
-    }
-
     public String getValueDeserializer() {
         return configuration.getValueDeserializer();
     }
@@ -375,10 +371,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         configuration.setSslKeystorePassword(sslKeystorePassword);
     }
 
-    public Boolean getBlockOnBufferFull() {
-        return configuration.getBlockOnBufferFull();
-    }
-
     public void setCheckCrcs(Boolean checkCrcs) {
         configuration.setCheckCrcs(checkCrcs);
     }
@@ -415,10 +407,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         configuration.setSslKeyPassword(sslKeyPassword);
     }
 
-    public void setBlockOnBufferFull(Boolean blockOnBufferFull) {
-        configuration.setBlockOnBufferFull(blockOnBufferFull);
-    }
-
     public Integer getRequestRequiredAcks() {
         return configuration.getRequestRequiredAcks();
     }
@@ -495,10 +483,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         return configuration.getSslTruststorePassword();
     }
 
-    public void setTimeoutMs(Integer timeoutMs) {
-        configuration.setTimeoutMs(timeoutMs);
-    }
-
     public void setConsumerStreams(int consumerStreams) {
         configuration.setConsumerStreams(consumerStreams);
     }
@@ -551,10 +535,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         return configuration.getPartitionAssignor();
     }
 
-    public Integer getMetadataFetchTimeoutMs() {
-        return configuration.getMetadataFetchTimeoutMs();
-    }
-
     public void setSecurityProtocol(String securityProtocol) {
         configuration.setSecurityProtocol(securityProtocol);
     }
@@ -655,10 +635,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         return configuration.getSendBufferBytes();
     }
 
-    public Integer getTimeoutMs() {
-        return configuration.getTimeoutMs();
-    }
-
     public String getSslProtocol() {
         return configuration.getSslProtocol();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d84cc700/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index 31c2dd6..1c2c564 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -64,21 +64,18 @@ public class KafkaComponentTest {
         assertEquals(new Integer(10), endpoint.getProducerBatchSize());
         assertEquals(new Integer(12), endpoint.getConnectionMaxIdleMs());
         assertEquals(new Integer(1), endpoint.getMaxBlockMs());
-        assertEquals(false, endpoint.getBlockOnBufferFull());
         assertEquals(new Integer(1), endpoint.getBufferMemorySize());
         assertEquals("testing", endpoint.getClientId());
         assertEquals("none", endpoint.getCompressionCodec());
         assertEquals(new Integer(1), endpoint.getLingerMs());
         assertEquals(new Integer(100), endpoint.getMaxRequestSize());
         assertEquals(100, endpoint.getRequestTimeoutMs().intValue());
-        assertEquals(new Integer(9043), endpoint.getMetadataFetchTimeoutMs());
         assertEquals(new Integer(1029), endpoint.getMetadataMaxAgeMs());
         assertEquals(new Integer(23), endpoint.getReceiveBufferBytes());
         assertEquals(new Integer(234), endpoint.getReconnectBackoffMs());
         assertEquals(new Integer(0), endpoint.getRetries());
         assertEquals(3782, endpoint.getRetryBackoffMs().intValue());
         assertEquals(765, endpoint.getSendBufferBytes().intValue());
-        assertEquals(new Integer(2045), endpoint.getTimeoutMs());
         assertEquals(new Integer(1), endpoint.getMaxInFlightRequest());
         assertEquals("org.apache.camel.reporters.TestReport,org.apache.camel.reporters.SampleReport",
endpoint.getMetricReporters());
         assertEquals(new Integer(3), endpoint.getNoOfMetricsSample());
@@ -134,10 +131,7 @@ public class KafkaComponentTest {
         props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, "32768");
         props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
         props.put(ProducerConfig.SEND_BUFFER_CONFIG, "131072");
-        props.put(ProducerConfig.TIMEOUT_CONFIG, "30000");
-        props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false");
         props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
-        props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "60000");
         props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "300000");
         props.put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, "2");
         props.put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, "30000");


Mime
View raw message