camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acosent...@apache.org
Subject camel git commit: CAMEL-9818: Camel kafka consumer adds legacy (deprecated properties)
Date Fri, 08 Apr 2016 13:20:38 GMT
Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x a4543cffb -> 52c30bf28


CAMEL-9818: Camel kafka consumer adds legacy (deprecated properties)


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/52c30bf2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/52c30bf2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/52c30bf2

Branch: refs/heads/camel-2.17.x
Commit: 52c30bf28404b328583f81aa06ccf54db9d3be98
Parents: a4543cf
Author: Andrea Cosentino <ancosen@gmail.com>
Authored: Fri Apr 8 15:03:50 2016 +0200
Committer: Andrea Cosentino <ancosen@gmail.com>
Committed: Fri Apr 8 15:18:33 2016 +0200

----------------------------------------------------------------------
 .../component/kafka/KafkaConfiguration.java     | 54 +-------------------
 .../camel/component/kafka/KafkaEndpoint.java    | 24 ---------
 .../component/kafka/KafkaComponentTest.java     |  6 ---
 3 files changed, 1 insertion(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/52c30bf2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 6132841..6b67fc5 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -182,18 +182,9 @@ public class KafkaConfiguration {
     //ssl.truststore.type
     @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE)
     private String sslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE;
-    //timeout.ms
-    @UriParam(label = "producer", defaultValue = "30000")
-    private Integer timeoutMs = 30000;
-    //block.on.buffer.full
-    @UriParam(label = "producer", defaultValue = "false")
-    private Boolean blockOnBufferFull = false;
     //max.in.flight.requests.per.connection
     @UriParam(label = "producer", defaultValue = "5")
     private Integer maxInFlightRequest = 5;
-    //metadata.fetch.timeout.ms
-    @UriParam(label = "producer", defaultValue = "60000")
-    private Integer metadataFetchTimeoutMs = 600 * 1000;
     //metadata.max.age.ms
     @UriParam(label = "producer", defaultValue = "300000")
     private Integer metadataMaxAgeMs = 300000;
@@ -274,10 +265,7 @@ public class KafkaConfiguration {
         addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol());
         addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider());
         addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType());
-        addPropertyIfNotNull(props, ProducerConfig.TIMEOUT_CONFIG, getTimeoutMs());
-        addPropertyIfNotNull(props, ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, getBlockOnBufferFull());
         addPropertyIfNotNull(props, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
getMaxInFlightRequest());
-        addPropertyIfNotNull(props, ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, getMetadataFetchTimeoutMs());
         addPropertyIfNotNull(props, ProducerConfig.METADATA_MAX_AGE_CONFIG, getMetadataMaxAgeMs());
         addPropertyIfNotNull(props, ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters());
         addPropertyIfNotNull(props, ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, getNoOfMetricsSample());
@@ -387,7 +375,7 @@ public class KafkaConfiguration {
     }
 
     /**
-     * Name of the topic to use. When used on a consumer endpoint the topic can be a comma
separated list of topics.
+     * Name of the topic to use.
      */
     public void setTopic(String topic) {
         this.topic = topic;
@@ -868,20 +856,6 @@ public class KafkaConfiguration {
         this.bufferMemorySize = bufferMemorySize;
     }
 
-    public Boolean getBlockOnBufferFull() {
-        return blockOnBufferFull;
-    }
-
-    /**
-     * When our memory buffer is exhausted we must either stop accepting new records (block)
or throw errors.
-     * By default this setting is true and we block, however in some scenarios blocking is
not desirable and it
-     * is better to immediately give an error. Setting this to false will accomplish that:
the producer will throw
-     * a BufferExhaustedException if a recrord is sent and the buffer space is full.
-     */
-    public void setBlockOnBufferFull(Boolean blockOnBufferFull) {
-        this.blockOnBufferFull = blockOnBufferFull;
-    }
-
     public Integer getRequestRequiredAcks() {
         return requestRequiredAcks;
     }
@@ -1001,20 +975,6 @@ public class KafkaConfiguration {
         this.receiveBufferBytes = receiveBufferBytes;
     }
 
-    public Integer getTimeoutMs() {
-        return timeoutMs;
-    }
-
-    /**
-     * The configuration controls the maximum amount of time the server will wait for acknowledgments
from followers to meet the
-     * acknowledgment requirements the producer has specified with the acks configuration.
If the requested number of acknowledgments
-     * are not met when the timeout elapses an error will be returned. This timeout is measured
on the server side and does not include
-     * the network latency of the request.
-     */
-    public void setTimeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-    }
-
     public Integer getMaxInFlightRequest() {
         return maxInFlightRequest;
     }
@@ -1027,18 +987,6 @@ public class KafkaConfiguration {
         this.maxInFlightRequest = maxInFlightRequest;
     }
 
-    public Integer getMetadataFetchTimeoutMs() {
-        return metadataFetchTimeoutMs;
-    }
-
-    /**
-     * The first time data is sent to a topic we must fetch metadata about that topic to
know which servers host the topic's partitions.
-     * This fetch to succeed before throwing an exception back to the client.
-     */
-    public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) {
-        this.metadataFetchTimeoutMs = metadataFetchTimeoutMs;
-    }
-
     public Integer getMetadataMaxAgeMs() {
         return metadataMaxAgeMs;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/52c30bf2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 5a56c39..369537a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -283,10 +283,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         configuration.setBrokers(brokers);
     }
 
-    public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) {
-        configuration.setMetadataFetchTimeoutMs(metadataFetchTimeoutMs);
-    }
-
     public String getValueDeserializer() {
         return configuration.getValueDeserializer();
     }
@@ -375,10 +371,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         configuration.setSslKeystorePassword(sslKeystorePassword);
     }
 
-    public Boolean getBlockOnBufferFull() {
-        return configuration.getBlockOnBufferFull();
-    }
-
     public void setCheckCrcs(Boolean checkCrcs) {
         configuration.setCheckCrcs(checkCrcs);
     }
@@ -415,10 +407,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         configuration.setSslKeyPassword(sslKeyPassword);
     }
 
-    public void setBlockOnBufferFull(Boolean blockOnBufferFull) {
-        configuration.setBlockOnBufferFull(blockOnBufferFull);
-    }
-
     public Integer getRequestRequiredAcks() {
         return configuration.getRequestRequiredAcks();
     }
@@ -495,10 +483,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         return configuration.getSslTruststorePassword();
     }
 
-    public void setTimeoutMs(Integer timeoutMs) {
-        configuration.setTimeoutMs(timeoutMs);
-    }
-
     public void setConsumerStreams(int consumerStreams) {
         configuration.setConsumerStreams(consumerStreams);
     }
@@ -551,10 +535,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         return configuration.getPartitionAssignor();
     }
 
-    public Integer getMetadataFetchTimeoutMs() {
-        return configuration.getMetadataFetchTimeoutMs();
-    }
-
     public void setSecurityProtocol(String securityProtocol) {
         configuration.setSecurityProtocol(securityProtocol);
     }
@@ -655,10 +635,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         return configuration.getSendBufferBytes();
     }
 
-    public Integer getTimeoutMs() {
-        return configuration.getTimeoutMs();
-    }
-
     public String getSslProtocol() {
         return configuration.getSslProtocol();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/52c30bf2/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index 31c2dd6..1c2c564 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -64,21 +64,18 @@ public class KafkaComponentTest {
         assertEquals(new Integer(10), endpoint.getProducerBatchSize());
         assertEquals(new Integer(12), endpoint.getConnectionMaxIdleMs());
         assertEquals(new Integer(1), endpoint.getMaxBlockMs());
-        assertEquals(false, endpoint.getBlockOnBufferFull());
         assertEquals(new Integer(1), endpoint.getBufferMemorySize());
         assertEquals("testing", endpoint.getClientId());
         assertEquals("none", endpoint.getCompressionCodec());
         assertEquals(new Integer(1), endpoint.getLingerMs());
         assertEquals(new Integer(100), endpoint.getMaxRequestSize());
         assertEquals(100, endpoint.getRequestTimeoutMs().intValue());
-        assertEquals(new Integer(9043), endpoint.getMetadataFetchTimeoutMs());
         assertEquals(new Integer(1029), endpoint.getMetadataMaxAgeMs());
         assertEquals(new Integer(23), endpoint.getReceiveBufferBytes());
         assertEquals(new Integer(234), endpoint.getReconnectBackoffMs());
         assertEquals(new Integer(0), endpoint.getRetries());
         assertEquals(3782, endpoint.getRetryBackoffMs().intValue());
         assertEquals(765, endpoint.getSendBufferBytes().intValue());
-        assertEquals(new Integer(2045), endpoint.getTimeoutMs());
         assertEquals(new Integer(1), endpoint.getMaxInFlightRequest());
         assertEquals("org.apache.camel.reporters.TestReport,org.apache.camel.reporters.SampleReport",
endpoint.getMetricReporters());
         assertEquals(new Integer(3), endpoint.getNoOfMetricsSample());
@@ -134,10 +131,7 @@ public class KafkaComponentTest {
         props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, "32768");
         props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
         props.put(ProducerConfig.SEND_BUFFER_CONFIG, "131072");
-        props.put(ProducerConfig.TIMEOUT_CONFIG, "30000");
-        props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false");
         props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
-        props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "60000");
         props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "300000");
         props.put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, "2");
         props.put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, "30000");


Mime
View raw message