Repository: camel
Updated Branches:
refs/heads/camel-2.17.x a4543cffb -> 52c30bf28
CAMEL-9818: Camel kafka consumer adds legacy (deprecated properties)
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/52c30bf2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/52c30bf2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/52c30bf2
Branch: refs/heads/camel-2.17.x
Commit: 52c30bf28404b328583f81aa06ccf54db9d3be98
Parents: a4543cf
Author: Andrea Cosentino <ancosen@gmail.com>
Authored: Fri Apr 8 15:03:50 2016 +0200
Committer: Andrea Cosentino <ancosen@gmail.com>
Committed: Fri Apr 8 15:18:33 2016 +0200
----------------------------------------------------------------------
.../component/kafka/KafkaConfiguration.java | 54 +-------------------
.../camel/component/kafka/KafkaEndpoint.java | 24 ---------
.../component/kafka/KafkaComponentTest.java | 6 ---
3 files changed, 1 insertion(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/52c30bf2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 6132841..6b67fc5 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -182,18 +182,9 @@ public class KafkaConfiguration {
//ssl.truststore.type
@UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE)
private String sslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE;
- //timeout.ms
- @UriParam(label = "producer", defaultValue = "30000")
- private Integer timeoutMs = 30000;
- //block.on.buffer.full
- @UriParam(label = "producer", defaultValue = "false")
- private Boolean blockOnBufferFull = false;
//max.in.flight.requests.per.connection
@UriParam(label = "producer", defaultValue = "5")
private Integer maxInFlightRequest = 5;
- //metadata.fetch.timeout.ms
- @UriParam(label = "producer", defaultValue = "60000")
- private Integer metadataFetchTimeoutMs = 600 * 1000;
//metadata.max.age.ms
@UriParam(label = "producer", defaultValue = "300000")
private Integer metadataMaxAgeMs = 300000;
@@ -274,10 +265,7 @@ public class KafkaConfiguration {
addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol());
addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider());
addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType());
- addPropertyIfNotNull(props, ProducerConfig.TIMEOUT_CONFIG, getTimeoutMs());
- addPropertyIfNotNull(props, ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, getBlockOnBufferFull());
addPropertyIfNotNull(props, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
getMaxInFlightRequest());
- addPropertyIfNotNull(props, ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, getMetadataFetchTimeoutMs());
addPropertyIfNotNull(props, ProducerConfig.METADATA_MAX_AGE_CONFIG, getMetadataMaxAgeMs());
addPropertyIfNotNull(props, ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters());
addPropertyIfNotNull(props, ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, getNoOfMetricsSample());
@@ -387,7 +375,7 @@ public class KafkaConfiguration {
}
/**
- * Name of the topic to use. When used on a consumer endpoint the topic can be a comma
separated list of topics.
+ * Name of the topic to use.
*/
public void setTopic(String topic) {
this.topic = topic;
@@ -868,20 +856,6 @@ public class KafkaConfiguration {
this.bufferMemorySize = bufferMemorySize;
}
- public Boolean getBlockOnBufferFull() {
- return blockOnBufferFull;
- }
-
- /**
- * When our memory buffer is exhausted we must either stop accepting new records (block)
or throw errors.
- * By default this setting is true and we block, however in some scenarios blocking is
not desirable and it
- * is better to immediately give an error. Setting this to false will accomplish that:
the producer will throw
- * a BufferExhaustedException if a recrord is sent and the buffer space is full.
- */
- public void setBlockOnBufferFull(Boolean blockOnBufferFull) {
- this.blockOnBufferFull = blockOnBufferFull;
- }
-
public Integer getRequestRequiredAcks() {
return requestRequiredAcks;
}
@@ -1001,20 +975,6 @@ public class KafkaConfiguration {
this.receiveBufferBytes = receiveBufferBytes;
}
- public Integer getTimeoutMs() {
- return timeoutMs;
- }
-
- /**
- * The configuration controls the maximum amount of time the server will wait for acknowledgments
from followers to meet the
- * acknowledgment requirements the producer has specified with the acks configuration.
If the requested number of acknowledgments
- * are not met when the timeout elapses an error will be returned. This timeout is measured
on the server side and does not include
- * the network latency of the request.
- */
- public void setTimeoutMs(Integer timeoutMs) {
- this.timeoutMs = timeoutMs;
- }
-
public Integer getMaxInFlightRequest() {
return maxInFlightRequest;
}
@@ -1027,18 +987,6 @@ public class KafkaConfiguration {
this.maxInFlightRequest = maxInFlightRequest;
}
- public Integer getMetadataFetchTimeoutMs() {
- return metadataFetchTimeoutMs;
- }
-
- /**
- * The first time data is sent to a topic we must fetch metadata about that topic to
know which servers host the topic's partitions.
- * This fetch to succeed before throwing an exception back to the client.
- */
- public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) {
- this.metadataFetchTimeoutMs = metadataFetchTimeoutMs;
- }
-
public Integer getMetadataMaxAgeMs() {
return metadataMaxAgeMs;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/52c30bf2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 5a56c39..369537a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -283,10 +283,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
configuration.setBrokers(brokers);
}
- public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) {
- configuration.setMetadataFetchTimeoutMs(metadataFetchTimeoutMs);
- }
-
public String getValueDeserializer() {
return configuration.getValueDeserializer();
}
@@ -375,10 +371,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
configuration.setSslKeystorePassword(sslKeystorePassword);
}
- public Boolean getBlockOnBufferFull() {
- return configuration.getBlockOnBufferFull();
- }
-
public void setCheckCrcs(Boolean checkCrcs) {
configuration.setCheckCrcs(checkCrcs);
}
@@ -415,10 +407,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
configuration.setSslKeyPassword(sslKeyPassword);
}
- public void setBlockOnBufferFull(Boolean blockOnBufferFull) {
- configuration.setBlockOnBufferFull(blockOnBufferFull);
- }
-
public Integer getRequestRequiredAcks() {
return configuration.getRequestRequiredAcks();
}
@@ -495,10 +483,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
return configuration.getSslTruststorePassword();
}
- public void setTimeoutMs(Integer timeoutMs) {
- configuration.setTimeoutMs(timeoutMs);
- }
-
public void setConsumerStreams(int consumerStreams) {
configuration.setConsumerStreams(consumerStreams);
}
@@ -551,10 +535,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
return configuration.getPartitionAssignor();
}
- public Integer getMetadataFetchTimeoutMs() {
- return configuration.getMetadataFetchTimeoutMs();
- }
-
public void setSecurityProtocol(String securityProtocol) {
configuration.setSecurityProtocol(securityProtocol);
}
@@ -655,10 +635,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
return configuration.getSendBufferBytes();
}
- public Integer getTimeoutMs() {
- return configuration.getTimeoutMs();
- }
-
public String getSslProtocol() {
return configuration.getSslProtocol();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/52c30bf2/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index 31c2dd6..1c2c564 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -64,21 +64,18 @@ public class KafkaComponentTest {
assertEquals(new Integer(10), endpoint.getProducerBatchSize());
assertEquals(new Integer(12), endpoint.getConnectionMaxIdleMs());
assertEquals(new Integer(1), endpoint.getMaxBlockMs());
- assertEquals(false, endpoint.getBlockOnBufferFull());
assertEquals(new Integer(1), endpoint.getBufferMemorySize());
assertEquals("testing", endpoint.getClientId());
assertEquals("none", endpoint.getCompressionCodec());
assertEquals(new Integer(1), endpoint.getLingerMs());
assertEquals(new Integer(100), endpoint.getMaxRequestSize());
assertEquals(100, endpoint.getRequestTimeoutMs().intValue());
- assertEquals(new Integer(9043), endpoint.getMetadataFetchTimeoutMs());
assertEquals(new Integer(1029), endpoint.getMetadataMaxAgeMs());
assertEquals(new Integer(23), endpoint.getReceiveBufferBytes());
assertEquals(new Integer(234), endpoint.getReconnectBackoffMs());
assertEquals(new Integer(0), endpoint.getRetries());
assertEquals(3782, endpoint.getRetryBackoffMs().intValue());
assertEquals(765, endpoint.getSendBufferBytes().intValue());
- assertEquals(new Integer(2045), endpoint.getTimeoutMs());
assertEquals(new Integer(1), endpoint.getMaxInFlightRequest());
assertEquals("org.apache.camel.reporters.TestReport,org.apache.camel.reporters.SampleReport",
endpoint.getMetricReporters());
assertEquals(new Integer(3), endpoint.getNoOfMetricsSample());
@@ -134,10 +131,7 @@ public class KafkaComponentTest {
props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, "32768");
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
props.put(ProducerConfig.SEND_BUFFER_CONFIG, "131072");
- props.put(ProducerConfig.TIMEOUT_CONFIG, "30000");
- props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false");
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
- props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "60000");
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "300000");
props.put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, "2");
props.put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, "30000");
|