metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [28/44] metron git commit: METRON-891 KafkaConsumer should not be shared among threads (jjmeyer via merrimanr) closes apache/metron#567
Date Thu, 01 Jun 2017 21:41:47 GMT
METRON-891 KafkaConsumer should not be shared among threads (jjmeyer via merrimanr) closes
apache/metron#567


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/47e2b735
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/47e2b735
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/47e2b735

Branch: refs/heads/Metron_0.4.0
Commit: 47e2b735ea682e7d647e3eaafac89a192a783e86
Parents: c0b0825
Author: jjmeyer <jjmeyer0@gmail.com>
Authored: Fri May 19 10:10:25 2017 -0500
Committer: merrimanr <merrimanr@apache.org>
Committed: Fri May 19 10:10:25 2017 -0500

----------------------------------------------------------------------
 dependencies_with_url.csv                       |   1 +
 metron-interface/metron-rest/pom.xml            |  34 +++-
 .../apache/metron/rest/config/KafkaConfig.java  |  52 ++++++-
 .../metron/rest/controller/KafkaController.java | 104 +++++++------
 .../metron/rest/service/KafkaService.java       |  45 +++++-
 .../rest/service/impl/KafkaServiceImpl.java     | 155 +++++++++++--------
 .../apache/metron/rest/config/TestConfig.java   |  32 ++--
 .../rest/service/impl/KafkaServiceImplTest.java |  15 +-
 8 files changed, 297 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/dependencies_with_url.csv
----------------------------------------------------------------------
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index 93a19b7..3752366 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -296,3 +296,4 @@ org.htrace:htrace-core:jar:3.0.4:compile,ASLv2,http://htrace.incubator.apache.or
 net.byteseek:byteseek:jar:2.0.3:compile,BSD,https://github.com/nishihatapalmer/byteseek
 org.springframework.security.kerberos:spring-security-kerberos-client:jar:1.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security-kerberos
 org.springframework.security.kerberos:spring-security-kerberos-core:jar:1.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security-kerberos
+org.springframework.kafka:spring-kafka:jar:1.1.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-kafka

http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/pom.xml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/pom.xml b/metron-interface/metron-rest/pom.xml
index b11e999..1c3ff92 100644
--- a/metron-interface/metron-rest/pom.xml
+++ b/metron-interface/metron-rest/pom.xml
@@ -33,9 +33,37 @@
         <spring.kerberos.version>1.0.1.RELEASE</spring.kerberos.version>
         <swagger.version>2.5.0</swagger.version>
         <mysql.client.version>5.1.40</mysql.client.version>
+        <spring-kafka.version>1.1.1.RELEASE</spring-kafka.version>
     </properties>
     <dependencies>
         <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+            <version>${spring-kafka.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.kafka</groupId>
+                    <artifactId>kafka-clients</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.springframework.retry</groupId>
+                    <artifactId>spring-retry</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.springframework</groupId>
+                    <artifactId>spring-messaging</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-web</artifactId>
             <exclusions>
@@ -75,7 +103,7 @@
                     <groupId>com.google.guava</groupId>
                     <artifactId>guava</artifactId>
                 </exclusion>
-              </exclusions>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.googlecode.json-simple</groupId>
@@ -112,8 +140,8 @@
                     <artifactId>jackson-databind</artifactId>
                 </exclusion>
                 <exclusion>
-                  <groupId>org.reflections</groupId>
-                  <artifactId>reflections</artifactId>
+                    <groupId>org.reflections</groupId>
+                    <artifactId>reflections</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>

http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
index 309a549..247264b 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
@@ -27,33 +27,56 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Profile;
 import org.springframework.core.env.Environment;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
 
-import java.util.Properties;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
 
+/**
+ * Configuration used for connecting to Kafka.
+ */
 @Configuration
 @Profile("!" + TEST_PROFILE)
 public class KafkaConfig {
-
+  /**
+   * The Spring environment.
+   */
   private Environment environment;
 
+  /**
+   * Construvtor used to inject {@link Environment}.
+   * @param environment Spring environment to inject.
+   */
   @Autowired
-  public KafkaConfig(Environment environment) {
+  public KafkaConfig(final Environment environment) {
     this.environment = environment;
   }
 
+  /**
+   * The client used for ZooKeeper.
+   */
   @Autowired
   private ZkClient zkClient;
 
+  /**
+   * Bean for ZooKeeper
+   */
   @Bean
   public ZkUtils zkUtils() {
     return ZkUtils.apply(zkClient, false);
   }
 
-  @Bean(destroyMethod = "close")
-  public KafkaConsumer<String, String> kafkaConsumer() {
-    Properties props = new Properties();
+  /**
+   * Create properties that will be used by {@link this#createConsumerFactory()}
+   *
+   * @return Configurations used by {@link this#createConsumerFactory()}.
+   */
+  @Bean
+  public Map<String, Object> consumerProperties() {
+    final Map<String, Object> props = new HashMap<>();
     props.put("bootstrap.servers", environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY));
     props.put("group.id", "metron-rest");
     props.put("enable.auto.commit", "false");
@@ -64,9 +87,24 @@ public class KafkaConfig {
     if (environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class,
false)) {
       props.put("security.protocol", "SASL_PLAINTEXT");
     }
-    return new KafkaConsumer<>(props);
+    return props;
+  }
+
+  /**
+   * Create a {@link ConsumerFactory} which will be used for certain Kafka interactions within
config API.
+   *
+   * @return a {@link ConsumerFactory} used to create {@link KafkaConsumer} for interactions
with Kafka.
+   */
+  @Bean
+  public ConsumerFactory<String, String> createConsumerFactory() {
+    return new DefaultKafkaConsumerFactory<>(consumerProperties());
   }
 
+  /**
+   * Create a bean for {@link AdminUtils$}. This is primarily done to make testing a bit
easier.
+   *
+   * @return {@link AdminUtils$} is written in scala. We return a reference to this class.
+   */
   @Bean
   public AdminUtils$ adminUtils() {
     return AdminUtils$.MODULE$;

http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java
index 0cd4d54..2787504 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java
@@ -35,62 +35,78 @@ import org.springframework.web.bind.annotation.RestController;
 
 import java.util.Set;
 
+/**
+ * The API resource that is use to interact with Kafka.
+ */
 @RestController
 @RequestMapping("/api/v1/kafka")
 public class KafkaController {
 
-    @Autowired
-    private KafkaService kafkaService;
+  /**
+   * Service used to interact with Kafka.
+   */
+  @Autowired
+  private KafkaService kafkaService;
 
-    @ApiOperation(value = "Creates a new Kafka topic")
+  @ApiOperation(value = "Creates a new Kafka topic")
+  @ApiResponses({
     @ApiResponse(message = "Returns saved Kafka topic", code = 200)
-    @RequestMapping(value = "/topic", method = RequestMethod.POST)
-    ResponseEntity<KafkaTopic> save(@ApiParam(name="topic", value="Kafka topic", required=true)@RequestBody
KafkaTopic topic) throws RestException {
-        return new ResponseEntity<>(kafkaService.createTopic(topic), HttpStatus.CREATED);
-    }
+  })
+  @RequestMapping(value = "/topic", method = RequestMethod.POST)
+  ResponseEntity<KafkaTopic> save(final @ApiParam(name = "topic", value = "Kafka topic",
required = true) @RequestBody KafkaTopic topic) throws RestException {
+    return new ResponseEntity<>(kafkaService.createTopic(topic), HttpStatus.CREATED);
+  }
 
-    @ApiOperation(value = "Retrieves a Kafka topic")
-    @ApiResponses(value = { @ApiResponse(message = "Returns Kafka topic", code = 200),
-            @ApiResponse(message = "Kafka topic is missing", code = 404) })
-    @RequestMapping(value = "/topic/{name}", method = RequestMethod.GET)
-    ResponseEntity<KafkaTopic> get(@ApiParam(name="name", value="Kafka topic name",
required=true)@PathVariable String name) throws RestException {
-        KafkaTopic kafkaTopic = kafkaService.getTopic(name);
-        if (kafkaTopic != null) {
-            return new ResponseEntity<>(kafkaTopic, HttpStatus.OK);
-        } else {
-            return new ResponseEntity<>(HttpStatus.NOT_FOUND);
-        }
+  @ApiOperation(value = "Retrieves a Kafka topic")
+  @ApiResponses(value = {
+    @ApiResponse(message = "Returns Kafka topic", code = 200),
+    @ApiResponse(message = "Kafka topic is missing", code = 404)
+  })
+  @RequestMapping(value = "/topic/{name}", method = RequestMethod.GET)
+  ResponseEntity<KafkaTopic> get(final @ApiParam(name = "name", value = "Kafka topic
name", required = true) @PathVariable String name) throws RestException {
+    KafkaTopic kafkaTopic = kafkaService.getTopic(name);
+    if (kafkaTopic != null) {
+      return new ResponseEntity<>(kafkaTopic, HttpStatus.OK);
+    } else {
+      return new ResponseEntity<>(HttpStatus.NOT_FOUND);
     }
+  }
 
-    @ApiOperation(value = "Retrieves all Kafka topics")
+  @ApiOperation(value = "Retrieves all Kafka topics")
+  @ApiResponses({
     @ApiResponse(message = "Returns a list of all Kafka topics", code = 200)
-    @RequestMapping(value = "/topic", method = RequestMethod.GET)
-    ResponseEntity<Set<String>> list() throws Exception {
-        return new ResponseEntity<>(kafkaService.listTopics(), HttpStatus.OK);
-    }
+  })
+  @RequestMapping(value = "/topic", method = RequestMethod.GET)
+  ResponseEntity<Set<String>> list() throws Exception {
+    return new ResponseEntity<>(kafkaService.listTopics(), HttpStatus.OK);
+  }
 
-    @ApiOperation(value = "Deletes a Kafka topic")
-    @ApiResponses(value = { @ApiResponse(message = "Kafka topic was deleted", code = 200),
-            @ApiResponse(message = "Kafka topic is missing", code = 404) })
-    @RequestMapping(value = "/topic/{name}", method = RequestMethod.DELETE)
-    ResponseEntity<Void> delete(@ApiParam(name="name", value="Kafka topic name", required=true)@PathVariable
String name) throws RestException {
-        if (kafkaService.deleteTopic(name)) {
-            return new ResponseEntity<>(HttpStatus.OK);
-        } else {
-            return new ResponseEntity<>(HttpStatus.NOT_FOUND);
-        }
+  @ApiOperation(value = "Deletes a Kafka topic")
+  @ApiResponses(value = {
+    @ApiResponse(message = "Kafka topic was deleted", code = 200),
+    @ApiResponse(message = "Kafka topic is missing", code = 404)
+  })
+  @RequestMapping(value = "/topic/{name}", method = RequestMethod.DELETE)
+  ResponseEntity<Void> delete(final @ApiParam(name = "name", value = "Kafka topic name",
required = true) @PathVariable String name) throws RestException {
+    if (kafkaService.deleteTopic(name)) {
+      return new ResponseEntity<>(HttpStatus.OK);
+    } else {
+      return new ResponseEntity<>(HttpStatus.NOT_FOUND);
     }
+  }
 
-    @ApiOperation(value = "Retrieves a sample message from a Kafka topic using the most recent
offset")
-    @ApiResponses(value = { @ApiResponse(message = "Returns sample message", code = 200),
-            @ApiResponse(message = "Either Kafka topic is missing or contains no messages",
code = 404) })
-    @RequestMapping(value = "/topic/{name}/sample", method = RequestMethod.GET)
-    ResponseEntity<String> getSample(@ApiParam(name="name", value="Kafka topic name",
required=true)@PathVariable String name) throws RestException {
-        String sampleMessage = kafkaService.getSampleMessage(name);
-        if (sampleMessage != null) {
-            return new ResponseEntity<>(sampleMessage, HttpStatus.OK);
-        } else {
-            return new ResponseEntity<>(HttpStatus.NOT_FOUND);
-        }
+  @ApiOperation(value = "Retrieves a sample message from a Kafka topic using the most recent
offset")
+  @ApiResponses(value = {
+    @ApiResponse(message = "Returns sample message", code = 200),
+    @ApiResponse(message = "Either Kafka topic is missing or contains no messages", code
= 404)
+  })
+  @RequestMapping(value = "/topic/{name}/sample", method = RequestMethod.GET)
+  ResponseEntity<String> getSample(final @ApiParam(name = "name", value = "Kafka topic
name", required = true) @PathVariable String name) throws RestException {
+    String sampleMessage = kafkaService.getSampleMessage(name);
+    if (sampleMessage != null) {
+      return new ResponseEntity<>(sampleMessage, HttpStatus.OK);
+    } else {
+      return new ResponseEntity<>(HttpStatus.NOT_FOUND);
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
index f3cd901..bee00f2 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
@@ -22,16 +22,49 @@ import org.apache.metron.rest.model.KafkaTopic;
 
 import java.util.Set;
 
+/**
+ * This is a set of operations created to interact with Kafka.
+ */
 public interface KafkaService {
-    String CONSUMER_OFFSETS_TOPIC = "__consumer_offsets";
+  /**
+   * Please see the following for documentation.
+   *
+   * @see <a href="https://kafka.apache.org/documentation/#impl_offsettracking">Kafka
offset tracking documentation</a>.
+   */
+  String CONSUMER_OFFSETS_TOPIC = "__consumer_offsets";
 
-    KafkaTopic createTopic(KafkaTopic topic) throws RestException;
+  /**
+   * Create a topic in Kafka for given information.
+   * @param topic The information used to create a Kafka topic.
+   * @return The Kafka topic created.
+   * @throws RestException If exceptions occur when creating a topic they should be wrapped
in a {@link RestException}.
+   */
+  KafkaTopic createTopic(KafkaTopic topic) throws RestException;
 
-    boolean deleteTopic(String name);
+  /**
+   * Delete a topic for a given name.
+   * @param name The name of the topic to delete.
+   * @return If topic was deleted true; otherwise false.
+   */
+  boolean deleteTopic(String name);
 
-    KafkaTopic getTopic(String name);
+  /**
+   * Retrieves the Kafka topic for a given name.
+   * @param name The name of the Kafka topic to retrieve.
+   * @return A {@link KafkaTopic} with the name of {@code name}. Null if topic with name,
{@code name}, doesn't exist.
+   */
+  KafkaTopic getTopic(String name);
 
-    Set<String> listTopics();
+  /**
+   * Returns a set of all topics.
+   * @return A set of all topics in Kafka.
+   */
+  Set<String> listTopics();
 
-    String getSampleMessage(String topic);
+  /**
+   * Return a single sample message from a given topic.
+   * @param topic The name of the topic to retrieve a sample message from.
+   * @return A string representation of the sample message retrieved. If topic doesn't exist
null will be returned.
+   */
+  String getSampleMessage(String topic);
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
index 33cb2e3..61e2618 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
@@ -20,8 +20,8 @@ package org.apache.metron.rest.service.impl;
 import kafka.admin.AdminOperationException;
 import kafka.admin.AdminUtils$;
 import kafka.admin.RackAwareMode;
-import kafka.admin.RackAwareMode$;
 import kafka.utils.ZkUtils;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
@@ -30,6 +30,7 @@ import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.model.KafkaTopic;
 import org.apache.metron.rest.service.KafkaService;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.stereotype.Service;
 
 import java.util.HashSet;
@@ -38,88 +39,106 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+/**
+ * The default service layer implementation of {@link KafkaService}.
+ *
+ * @see KafkaService
+ */
 @Service
 public class KafkaServiceImpl implements KafkaService {
-    private ZkUtils zkUtils;
-    private KafkaConsumer<String, String> kafkaConsumer;
-    private AdminUtils$ adminUtils;
+  /**
+   * The timeout used when polling Kafka.
+   */
+  private static final int KAFKA_CONSUMER_TIMEOUT = 100;
 
-    @Autowired
-    public KafkaServiceImpl(ZkUtils zkUtils, KafkaConsumer<String, String> kafkaConsumer,
AdminUtils$ adminUtils) {
-        this.zkUtils = zkUtils;
-        this.kafkaConsumer = kafkaConsumer;
-        this.adminUtils = adminUtils;
-    }
+  private final ZkUtils zkUtils;
+  private final ConsumerFactory<String, String> kafkaConsumerFactory;
+  private final AdminUtils$ adminUtils;
 
-    @Override
-    public KafkaTopic createTopic(KafkaTopic topic) throws RestException {
-        if (!listTopics().contains(topic.getName())) {
-          try {
-              adminUtils.createTopic(zkUtils, topic.getName(), topic.getNumPartitions(),
topic.getReplicationFactor(), topic.getProperties(),RackAwareMode.Disabled$.MODULE$ );
-          } catch (AdminOperationException e) {
-              throw new RestException(e);
-          }
-        }
-        return topic;
+  /**
+   * @param zkUtils              A utility class used to interact with ZooKeeper.
+   * @param kafkaConsumerFactory A class used to create {@link KafkaConsumer} in order to
interact with Kafka.
+   * @param adminUtils           A utility class used to do administration operations on
Kafka.
+   */
+  @Autowired
+  public KafkaServiceImpl(final ZkUtils zkUtils,
+                          final ConsumerFactory<String, String> kafkaConsumerFactory,
+                          final AdminUtils$ adminUtils) {
+    this.zkUtils = zkUtils;
+    this.kafkaConsumerFactory = kafkaConsumerFactory;
+    this.adminUtils = adminUtils;
+  }
+
+  @Override
+  public KafkaTopic createTopic(final KafkaTopic topic) throws RestException {
+    if (!listTopics().contains(topic.getName())) {
+      try {
+        adminUtils.createTopic(zkUtils, topic.getName(), topic.getNumPartitions(), topic.getReplicationFactor(),
topic.getProperties(), RackAwareMode.Disabled$.MODULE$);
+      } catch (AdminOperationException e) {
+        throw new RestException(e);
+      }
     }
+    return topic;
+  }
 
-    @Override
-    public boolean deleteTopic(String name) {
-        Set<String> topics = listTopics();
-        if (topics != null && topics.contains(name)) {
-            adminUtils.deleteTopic(zkUtils, name);
-            return true;
-        } else {
-            return false;
-        }
+  @Override
+  public boolean deleteTopic(final String name) {
+    final Set<String> topics = listTopics();
+    if (topics != null && topics.contains(name)) {
+      adminUtils.deleteTopic(zkUtils, name);
+      return true;
+    } else {
+      return false;
     }
+  }
 
-    @Override
-    public KafkaTopic getTopic(String name) {
-        KafkaTopic kafkaTopic = null;
-        if (listTopics().contains(name)) {
-            List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(name);
-            if (partitionInfos.size() > 0) {
-                PartitionInfo partitionInfo = partitionInfos.get(0);
-                kafkaTopic = new KafkaTopic();
-                kafkaTopic.setName(name);
-                kafkaTopic.setNumPartitions(partitionInfos.size());
-                kafkaTopic.setReplicationFactor(partitionInfo.replicas().length);
-            }
+  @Override
+  public KafkaTopic getTopic(final String name) {
+    KafkaTopic kafkaTopic = null;
+    if (listTopics().contains(name)) {
+      try (Consumer<String, String> consumer = kafkaConsumerFactory.createConsumer())
{
+        final List<PartitionInfo> partitionInfos = consumer.partitionsFor(name);
+        if (partitionInfos.size() > 0) {
+          final PartitionInfo partitionInfo = partitionInfos.get(0);
+          kafkaTopic = new KafkaTopic();
+          kafkaTopic.setName(name);
+          kafkaTopic.setNumPartitions(partitionInfos.size());
+          kafkaTopic.setReplicationFactor(partitionInfo.replicas().length);
         }
-        return kafkaTopic;
+      }
     }
+    return kafkaTopic;
+  }
 
-    @Override
-    public Set<String> listTopics() {
-        Set<String> topics;
-        synchronized (this) {
-            Map<String, List<PartitionInfo>> topicsInfo = kafkaConsumer.listTopics();
-            topics = topicsInfo == null ? new HashSet<>() : topicsInfo.keySet();
-            topics.remove(CONSUMER_OFFSETS_TOPIC);
-        }
-        return topics;
+  @Override
+  public Set<String> listTopics() {
+    try (Consumer<String, String> consumer = kafkaConsumerFactory.createConsumer())
{
+      final Map<String, List<PartitionInfo>> topicsInfo = consumer.listTopics();
+      final Set<String> topics = topicsInfo == null ? new HashSet<>() : topicsInfo.keySet();
+      topics.remove(CONSUMER_OFFSETS_TOPIC);
+      return topics;
     }
+  }
 
-    @Override
-    public String getSampleMessage(String topic) {
-        String message = null;
-        if (listTopics().contains(topic)) {
-            synchronized (this) {
-                kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream()
-                    .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
-                    .collect(Collectors.toList()));
+  @Override
+  public String getSampleMessage(final String topic) {
+    String message = null;
+    if (listTopics().contains(topic)) {
+      try (Consumer<String, String> kafkaConsumer = kafkaConsumerFactory.createConsumer())
{
+        kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream()
+          .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
+          .collect(Collectors.toList()));
 
-                kafkaConsumer.assignment().stream()
-                    .filter(p -> (kafkaConsumer.position(p) -1) >= 0)
-                    .forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1));
+        kafkaConsumer.assignment().stream()
+          .filter(p -> (kafkaConsumer.position(p) - 1) >= 0)
+          .forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1));
 
-                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
-                message  = records.isEmpty() ? null : records.iterator().next().value();
-                kafkaConsumer.unsubscribe();
-            }
-        }
-        return message;
+        final ConsumerRecords<String, String> records = kafkaConsumer.poll(KAFKA_CONSUMER_TIMEOUT);
+        message = records.isEmpty() ? null : records.iterator().next().value();
+        kafkaConsumer.unsubscribe();
+      }
     }
+    return message;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
index edfd542..adfe056 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
@@ -25,7 +25,6 @@ import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.UnableToStartException;
 import org.apache.metron.integration.components.KafkaComponent;
@@ -36,8 +35,12 @@ import org.apache.metron.rest.service.impl.StormCLIWrapper;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Profile;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
 import org.springframework.web.client.RestTemplate;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
@@ -54,7 +57,7 @@ public class TestConfig {
   @Bean
   public ZKServerComponent zkServerComponent(Properties zkProperties) {
     return new ZKServerComponent()
-            .withPostStartCallback((zkComponent) -> zkProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY,
zkComponent.getConnectionString()));
+      .withPostStartCallback((zkComponent) -> zkProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY,
zkComponent.getConnectionString()));
   }
 
   @Bean
@@ -66,10 +69,10 @@ public class TestConfig {
   @Bean
   public ComponentRunner componentRunner(ZKServerComponent zkServerComponent, KafkaComponent
kafkaWithZKComponent) {
     ComponentRunner runner = new ComponentRunner.Builder()
-            .withComponent("zk", zkServerComponent)
-            .withComponent("kafka", kafkaWithZKComponent)
-            .withCustomShutdownOrder(new String[] {"kafka","zk"})
-            .build();
+      .withComponent("zk", zkServerComponent)
+      .withComponent("kafka", kafkaWithZKComponent)
+      .withCustomShutdownOrder(new String[]{"kafka", "zk"})
+      .build();
     try {
       runner.start();
     } catch (UnableToStartException e) {
@@ -78,14 +81,14 @@ public class TestConfig {
     return runner;
   }
 
-  @Bean(initMethod = "start", destroyMethod="close")
+  @Bean(initMethod = "start", destroyMethod = "close")
   public CuratorFramework client(ComponentRunner componentRunner) {
     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
     ZKServerComponent zkServerComponent = componentRunner.getComponent("zk", ZKServerComponent.class);
     return CuratorFrameworkFactory.newClient(zkServerComponent.getConnectionString(), retryPolicy);
   }
 
-  @Bean(destroyMethod="close")
+  @Bean(destroyMethod = "close")
   public ZkClient zkClient(ComponentRunner componentRunner) {
     ZKServerComponent zkServerComponent = componentRunner.getComponent("zk", ZKServerComponent.class);
     return new ZkClient(zkServerComponent.getConnectionString(), 10000, 10000, ZKStringSerializer$.MODULE$);
@@ -96,9 +99,9 @@ public class TestConfig {
     return ZkUtils.apply(zkClient, false);
   }
 
-  @Bean(destroyMethod="close")
-  public KafkaConsumer<String, String> kafkaConsumer(KafkaComponent kafkaWithZKComponent)
{
-    Properties props = new Properties();
+  @Bean
+  public Map<String, Object> kafkaConsumer(KafkaComponent kafkaWithZKComponent) {
+    Map<String, Object> props = new HashMap<>();
     props.put("bootstrap.servers", kafkaWithZKComponent.getBrokerList());
     props.put("group.id", "metron-config");
     props.put("enable.auto.commit", "false");
@@ -106,7 +109,12 @@ public class TestConfig {
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-    return new KafkaConsumer<>(props);
+    return props;
+  }
+
+  @Bean
+  public ConsumerFactory<String, String> createConsumerFactory() {
+    return new DefaultKafkaConsumerFactory<>(kafkaConsumer(kafkaWithZKComponent(zkProperties())));
   }
 
   @Bean

http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
index c7d42b3..c92feab 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
@@ -41,6 +41,7 @@ import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+import org.springframework.kafka.core.ConsumerFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -71,6 +72,7 @@ public class KafkaServiceImplTest {
 
   private ZkUtils zkUtils;
   private KafkaConsumer<String, String> kafkaConsumer;
+  private ConsumerFactory<String, String> kafkaConsumerFactory;
   private AdminUtils$ adminUtils;
 
   private KafkaService kafkaService;
@@ -86,10 +88,13 @@ public class KafkaServiceImplTest {
   @Before
   public void setUp() throws Exception {
     zkUtils = mock(ZkUtils.class);
+    kafkaConsumerFactory = mock(ConsumerFactory.class);
     kafkaConsumer = mock(KafkaConsumer.class);
     adminUtils = mock(AdminUtils$.class);
 
-    kafkaService = new KafkaServiceImpl(zkUtils, kafkaConsumer, adminUtils);
+    when(kafkaConsumerFactory.createConsumer()).thenReturn(kafkaConsumer);
+
+    kafkaService = new KafkaServiceImpl(zkUtils, kafkaConsumerFactory, adminUtils);
   }
 
   @Test
@@ -104,6 +109,7 @@ public class KafkaServiceImplTest {
 
     verifyZeroInteractions(zkUtils);
     verify(kafkaConsumer).listTopics();
+    verify(kafkaConsumer).close();
     verifyNoMoreInteractions(kafkaConsumer, zkUtils, adminUtils);
   }
 
@@ -119,6 +125,7 @@ public class KafkaServiceImplTest {
 
     verifyZeroInteractions(zkUtils);
     verify(kafkaConsumer).listTopics();
+    verify(kafkaConsumer).close();
     verifyNoMoreInteractions(kafkaConsumer, zkUtils);
   }
 
@@ -137,6 +144,7 @@ public class KafkaServiceImplTest {
 
     verifyZeroInteractions(zkUtils);
     verify(kafkaConsumer).listTopics();
+    verify(kafkaConsumer).close();
     verifyNoMoreInteractions(kafkaConsumer, zkUtils);
   }
 
@@ -156,6 +164,7 @@ public class KafkaServiceImplTest {
 
     verifyZeroInteractions(zkUtils);
     verify(kafkaConsumer).listTopics();
+    verify(kafkaConsumer).close();
     verifyNoMoreInteractions(kafkaConsumer, zkUtils);
   }
 
@@ -167,6 +176,7 @@ public class KafkaServiceImplTest {
 
     verifyZeroInteractions(zkUtils);
     verify(kafkaConsumer).listTopics();
+    verify(kafkaConsumer).close();
     verifyNoMoreInteractions(kafkaConsumer, zkUtils);
   }
 
@@ -180,6 +190,7 @@ public class KafkaServiceImplTest {
     assertTrue(kafkaService.deleteTopic("non_existent_topic"));
 
     verify(kafkaConsumer).listTopics();
+    verify(kafkaConsumer).close();
     verify(adminUtils).deleteTopic(zkUtils, "non_existent_topic");
     verifyNoMoreInteractions(kafkaConsumer);
   }
@@ -193,6 +204,7 @@ public class KafkaServiceImplTest {
     assertFalse(kafkaService.deleteTopic("non_existent_topic"));
 
     verify(kafkaConsumer).listTopics();
+    verify(kafkaConsumer).close();
     verifyNoMoreInteractions(kafkaConsumer);
   }
 
@@ -230,6 +242,7 @@ public class KafkaServiceImplTest {
 
     verify(kafkaConsumer).listTopics();
     verify(kafkaConsumer, times(0)).partitionsFor("t");
+    verify(kafkaConsumer).close();
     verifyZeroInteractions(zkUtils);
     verifyNoMoreInteractions(kafkaConsumer);
   }


Mime
View raw message