sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tma...@apache.org
Subject [sling-org-apache-sling-distribution-journal-kafka] branch master updated: SLING-8451 - Document KafkaEndpoint configuration properties and use Kafka default
Date Fri, 31 May 2019 13:22:12 GMT
This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 65f09ba  SLING-8451 - Document KafkaEndpoint configuration properties and use Kafka
default
65f09ba is described below

commit 65f09baeb2f7581039f798d948a59c5321a07358
Author: tmaret <tmaret@adobe.com>
AuthorDate: Fri May 31 15:21:36 2019 +0200

    SLING-8451 - Document KafkaEndpoint configuration properties and use Kafka default
---
 .../journal/kafka/KafkaClientProvider.java         |  4 +-
 .../distribution/journal/kafka/KafkaEndpoint.java  | 16 +++----
 .../journal/kafka/KafkaClientProviderTest.java     | 13 ++----
 .../journal/kafka/util/KafkaEndpointBuilder.java   | 49 ++++++++++++++++++++++
 .../distribution/journal/kafka/util/KafkaRule.java | 11 +++--
 5 files changed, 67 insertions(+), 26 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
index 75c3e99..bc515eb 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
@@ -267,9 +267,7 @@ public class KafkaClientProvider implements MessagingProvider, Closeable
{
         config.put(BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
         config.put(SASL_MECHANISM, saslMechanism);
         config.put(SECURITY_PROTOCOL_CONFIG, securityProtocol);
-        if (!saslJaasConfig.isEmpty()) {
-            config.put(SASL_JAAS_CONFIG, saslJaasConfig);
-        }
+        config.put(SASL_JAAS_CONFIG, saslJaasConfig);
         return config;
     }
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaEndpoint.java
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaEndpoint.java
index 7301005..22100ed 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaEndpoint.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaEndpoint.java
@@ -22,7 +22,7 @@ import org.osgi.service.metatype.annotations.AttributeDefinition;
 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
 
 @ObjectClassDefinition(name = "Apache Sling Journal Distribution - Kafka endpoint",
-        description = "Apache Kafka Endpoint")
+        description = "Apache Kafka Endpoint. Default values match Kafka properties defaults.")
 public @interface KafkaEndpoint {
 
     @AttributeDefinition(name = "Kafka Bootstrap Servers",
@@ -37,16 +37,16 @@ public @interface KafkaEndpoint {
             description = "Kafka Default API Timeout in ms.")
     int kafkaDefaultApiTimeout() default 60000;
     
-    @AttributeDefinition(name = "Security protocol",
-            description = "e.g. SASL_SSL")
+    @AttributeDefinition(name = "Kafka Security protocol",
+            description = "Kafka Protocol used to communicate with brokers.")
     String securityProtocol() default "PLAINTEXT";
     
-    @AttributeDefinition(name = "Sasl mechanism",
-            description = "e.g. PLAIN")
+    @AttributeDefinition(name = "Kafka SASL mechanism",
+            description = "Kafka SASL mechanism used for client connections.")
     String saslMechanism() default "GSSAPI";
     
-    @AttributeDefinition(name = "Sasl jaas config",
-            description = "")
-    String saslJaasConfig() default "";
+    @AttributeDefinition(name = "Kafka SASL JAAS config",
+            description = "Kafka JAAS login context parameters for SASL connections in the
format used by JAAS configuration files.")
+    String saslJaasConfig();
 
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
index 29448f8..5ed0699 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
@@ -18,11 +18,12 @@
  */
 package org.apache.sling.distribution.journal.kafka;
 
+import static java.util.Collections.emptyMap;
+import static org.apache.sling.distribution.journal.kafka.util.KafkaEndpointBuilder.buildKafkaEndpoint;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
-import static org.osgi.util.converter.Converters.standardConverter;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -56,13 +57,13 @@ public class KafkaClientProviderTest {
     @Before
     public void before() {
         doReturn(consumer).when(provider).createConsumer(Mockito.any(), Mockito.any());
-        KafkaEndpoint config = createConfig();
+        KafkaEndpoint config = buildKafkaEndpoint(emptyMap());
         provider.activate(config);
     }
 
     @Test
     public void testAssertTopicWhenDoesNotExist() throws Exception {
-        when(consumer.listTopics()).thenReturn(Collections.emptyMap());
+        when(consumer.listTopics()).thenReturn(emptyMap());
         try {
             provider.assertTopic(TOPIC);
             Assert.fail();
@@ -113,10 +114,4 @@ public class KafkaClientProviderTest {
         String assign = provider.assignTo(1l);
         assertThat(assign, equalTo("0:1"));
     }
-
-    private KafkaEndpoint createConfig() {
-        Map<String, String> props = new HashMap<>();
-        KafkaEndpoint config = standardConverter().convert(props).to(KafkaEndpoint.class);
-        return config;
-    }
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaEndpointBuilder.java
b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaEndpointBuilder.java
new file mode 100644
index 0000000..be5dde9
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaEndpointBuilder.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka.util;
+
+import java.util.Map;
+
+import org.apache.sling.distribution.journal.kafka.KafkaEndpoint;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.osgi.util.converter.Converters.standardConverter;
+
+public class KafkaEndpointBuilder {
+
+    public static KafkaEndpoint buildKafkaEndpoint(Map<String, Object> props) {
+
+        /*
+         * The standardConverter does not support null default
+         * Until FELIX-6137 is fixed, we use this 'creative' way
+         * to build KafkaEndpoint.
+         */
+
+        KafkaEndpoint proxy = standardConverter().convert(props).to(KafkaEndpoint.class);
+        KafkaEndpoint endpoint = mock(KafkaEndpoint.class);
+        when(endpoint.saslJaasConfig()).thenReturn(null);
+        when(endpoint.securityProtocol()).thenReturn(proxy.securityProtocol());
+        when(endpoint.kafkaBootstrapServers()).thenReturn(proxy.kafkaBootstrapServers());
+        when(endpoint.kafkaDefaultApiTimeout()).thenReturn(proxy.kafkaDefaultApiTimeout());
+        when(endpoint.kafkaRequestTimeout()).thenReturn(proxy.kafkaRequestTimeout());
+        when(endpoint.saslMechanism()).thenReturn(proxy.saslMechanism());
+        return endpoint;
+    }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
index 7f73649..87a7e6a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
@@ -18,9 +18,8 @@
  */
 package org.apache.sling.distribution.journal.kafka.util;
 
-import static org.osgi.util.converter.Converters.standardConverter;
+import static org.apache.sling.distribution.journal.kafka.util.KafkaEndpointBuilder.buildKafkaEndpoint;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -46,7 +45,7 @@ public class KafkaRule implements TestRule {
         };
     }
 
-    private void runWithKafka(Statement base) throws Throwable, IOException, Exception {
+    private void runWithKafka(Statement base) throws Throwable {
         try (KafkaLocal kafka = new KafkaLocal()) {
             this.provider = createProvider();
             base.evaluate();
@@ -56,10 +55,10 @@ public class KafkaRule implements TestRule {
 
     private KafkaClientProvider createProvider() {
         KafkaClientProvider provider = new KafkaClientProvider();
-        
-        Map<String, String> props = new HashMap<>();
+
+        Map<String, Object> props = new HashMap<>();
         props.put("connectTimeout", "5000");
-        KafkaEndpoint config = standardConverter().convert(props).to(KafkaEndpoint.class);
+        KafkaEndpoint config = buildKafkaEndpoint(props);
         provider.activate(config);
         return provider;
     }


Mime
View raw message