kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Creating a Consumer Thread Pool
Date Thu, 29 Aug 2013 04:00:27 GMT
Do you "Shutting down Thread: " in the output? Are all threads shut down?

Thanks,

Jun


On Wed, Aug 28, 2013 at 1:52 PM, David Williams <dwilliams@truecar.com>wrote:

> Hi Jun,
>
> Thanks for following up.  I removed the statement but still see no
> messages from the producer.  Also when that statement is in with the
> single threaded consumer example, it prints "non-empty iterator" for its
> toString method versus "empty iterator" in the non working multi stream
> example.
>
> Here is the code.  When this is running in a loop, I have been sending
> messages via the console producer script.
>
>
> AppCongig.java
> --------------------------------------------------------------
> import javax.inject.Named;
> import java.util.Properties;
> import kafka.consumer.ConsumerConfig;
> import kafka.producer.ProducerConfig;
> import org.springframework.context.annotation.Bean;
> import org.springframework.context.annotation.ComponentScan;
> import org.springframework.context.annotation.Configuration;
>
> @Configuration
> @ComponentScan("com.example.kafka")
> public class AppConfig {
>
>     @Bean
>     @Named("sharedProducerConsumerConfig")
>     private static Properties sharedProducerConsumerConfig() {
>         Properties properties = new Properties();
>         properties.put("zookeeper.connect", "127.0.0.1:2181");
>         properties.put("group.id", "group1");
>         properties.put("zookeeper.session.timeout.ms", "400");
>         properties.put("zookeeper.sync.time.ms", "200");
>         properties.put("auto.commit.interval.ms", "1000");
>         return properties;
>     }
>
>     @Bean
>     @Named("consumerConfig")
>     private static ConsumerConfig consumerConfig() {
>         Properties properties = sharedProducerConsumerConfig();
>         return new ConsumerConfig(properties);
>     }
>
>     @Bean
>     @Named("producerConfig")
>     private static ProducerConfig producerConfig() {
>         Properties properties = sharedProducerConsumerConfig();
>         properties.put("serializer.class",
> "kafka.serializer.StringEncoder");
>         properties.put("metadata.broker.list", "localhost:9092");
>         return new ProducerConfig(properties);
>     }
>
> }
>
>
>
>
> Consumer.java
> -------------------------------------------------------------------
> import kafka.consumer.KafkaStream;
> import kafka.consumer.ConsumerIterator;
>
> public class Consumer implements Runnable {
>
>     private KafkaStream kafkaStream;
>     private Integer threadNumber;
>
>     public Consumer(KafkaStream kafkaStream, Integer threadNumber) {
>         this.threadNumber = threadNumber;
>         this.kafkaStream = kafkaStream;
>     }
>
>     public void run() {
>         ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
>         while(true) {
>
>             try {
>                 Thread.sleep(1000);
>             } catch (InterruptedException e) {
>                 break;
>             }
>
>             while(it.hasNext()) {
>
>                 System.out.println("Thread " + threadNumber + ": " + new
> String(it.next().message()));
>
>             }
>         }
>         System.out.println("Shutting down Thread: " + threadNumber);
>     }
> }
>
>
>
>
>
> ConsumerThreadPool.java (the run method does not work, the runSingleWorker
> method does work)
> ---------------------------------------------------------------------------
> --------------------
> import kafka.consumer.ConsumerIterator;
> import kafka.consumer.KafkaStream;
> import kafka.consumer.ConsumerConfig;
> import kafka.javaapi.consumer.ConsumerConnector;
>
> import java.util.Map;
> import java.util.List;
> import java.util.HashMap;
> import java.util.concurrent.Executors;
> import java.util.concurrent.ExecutorService;
>
> import org.springframework.context.ApplicationContext;
> import
> org.springframework.context.annotation.AnnotationConfigApplicationContext;
>
> import com.truecar.inventory.worker.core.application.config.AppConfig;
>
> public class ConsumerThreadPool {
>
>     private final ConsumerConnector consumer;
>     private final String topic;
>
>     private ExecutorService executor;
>     private static ApplicationContext context = new
> AnnotationConfigApplicationContext(AppConfig.class);
>
>     public ConsumerThreadPool(String topic) {
>         consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)context
> .getBean("consumerConfig"));
>         this.topic = topic;
>     }
>
>     public void shutdown() {
>         if (consumer != null) consumer.shutdown();
>         if (executor != null) executor.shutdown();
>     }
>
>     public void run(Integer numThreads) {
>         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>
>         topicCountMap.put(topic, numThreads);
>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap
=
> consumer.createMessageStreams(topicCountMap);
>         List<KafkaStream<byte[], byte[]>> topicListeners =
> consumerMap.get(topic);
>
>         executor = Executors.newFixedThreadPool(numThreads);
>
>         for(Integer i = 0; i < numThreads; i++ ){
>             KafkaStream<byte[], byte[]> stream =  topicListeners.get(i);
>             executor.submit(new Consumer(stream, i));
>         }
>     }
>
>
>     public void runSingleWorker(Integer numThreads) {
>         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>
>         topicCountMap.put(topic, new Integer(1));
>
>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap
=
> consumer.createMessageStreams(topicCountMap);
>
>         KafkaStream<byte[], byte[]> stream =
> consumerMap.get(topic).get(0);
>         ConsumerIterator<byte[], byte[]> it = stream.iterator();
>         while(true) {
>             try {
>                 Thread.sleep(1000);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>             while(it.hasNext()){
>                 System.out.println(new String(it.next().message()));
>
>             }
>         }
>     }
>
> }
>
>
>
>
>
>
> Pom.xml
> ------------------------------------------------------------------------
> <?xml version="1.0" encoding="UTF-8"?>
> <project
>     xmlns="http://maven.apache.org/POM/4.0.0"
>     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>     xsi:schemaLocation="
>         http://maven.apache.org/POM/4.0.0
>         http://maven.apache.org/xsd/maven-4.0.0.xsd">
>     <modelVersion>4.0.0</modelVersion>
>     <groupId>group1</groupId>
>     <artifactId>artifact1</artifactId>
>     <version>0.1.0</version>
>     <packaging>jar</packaging>
>     <properties>
>         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>
> <org.springframework.version>3.2.4.RELEASE</org.springframework.version>
>     </properties>
>     <dependencies>
>         <dependency>
>             <groupId>org.springframework</groupId>
>             <artifactId>spring-core</artifactId>
>             <version>3.2.4.RELEASE</version>
>         </dependency>
>         <dependency>
>             <groupId>org.springframework</groupId>
>             <artifactId>spring-context</artifactId>
>             <version>3.2.4.RELEASE</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.kafka</groupId>
>             <artifactId>kafka_2.9.2</artifactId>
>             <version>0.8.0-beta1</version>
>         </dependency>
>         <dependency>
>             <groupId>javax.inject</groupId>
>             <artifactId>javax.inject</artifactId>
>             <version>1</version>
>         </dependency>
>         <dependency>
>             <groupId>org.scala-lang</groupId>
>             <artifactId>scala-library</artifactId>
>             <version>2.9.2</version>
>         </dependency>
>         <dependency>
>             <groupId>log4j</groupId>
>             <artifactId>log4j</artifactId>
>             <version>1.2.17</version>
>         </dependency>
>         <dependency>
>             <groupId>com.101tec</groupId>
>             <artifactId>zkclient</artifactId>
>             <version>0.3</version>
>         </dependency>
>         <dependency>
>             <groupId>com.yammer.metrics</groupId>
>             <artifactId>metrics-core</artifactId>
>             <version>2.2.0</version>
>         </dependency>
>     </dependencies>
>     <build>
>         <finalName>inventory-core</finalName>
>         <plugins>
>             <plugin>
>                 <groupId>org.apache.maven.plugins</groupId>
>                 <artifactId>maven-compiler-plugin</artifactId>
>                 <version>3.0</version>
>                 <configuration>
>                     <source>1.7</source>
>                     <target>1.7</target>
>                 </configuration>
>             </plugin>
>             <plugin>
>                 <groupId>org.apache.maven.plugins</groupId>
>                 <artifactId>maven-jar-plugin</artifactId>
>                 <configuration>
>                     <archive>
>                         <manifest>
>
> <mainClass>com.truecar.inventory.worker.core.application.Starter</mainClass
> >
>                         </manifest>
>                     </archive>
>                 </configuration>
>             </plugin>
>             <plugin>
>                 <groupId>org.dstovall</groupId>
>                 <artifactId>onejar-maven-plugin</artifactId>
>                 <version>1.4.4</version>
>                 <executions>
>                     <execution>
>                         <configuration>
>                             <onejarVersion>0.97</onejarVersion>
>                             <classifier>onejar</classifier>
>                         </configuration>
>                         <goals>
>                             <goal>one-jar</goal>
>                         </goals>
>                     </execution>
>                 </executions>
>             </plugin>
>         </plugins>
>     </build>
>     <pluginRepositories>
>         <pluginRepository>
>             <id>onejar-maven-plugin.googlecode.com</id>
>
> <url>http://onejar-maven-plugin.googlecode.com/svn/mavenrepo</url>
>         </pluginRepository>
>     </pluginRepositories>
> </project>
>
>
>
>
>
>
>
>
>
>
> --
>
>
>
>
>
>
> On 8/28/13 8:24 AM, "Jun Rao" <junrao@gmail.com> wrote:
>
> >Could you remove the following statement and see if it works?
> >
> >System.out.println("Created iterator " + it.toString() + " thread number "
> >+ threadNumber);
> >
> >Thanks,
> >
> >Jun
> >
> >
> >On Tue, Aug 27, 2013 at 3:43 PM, David Williams
> ><dwilliams@truecar.com>wrote:
> >
> >>
> >> Hi all,
> >>
> >> I checked out the java source and looked at the java examples.  They
> >> worked well in my IDE and on the console.  However, I also tried the
> >> threaded example following the consumer group example.  The problem is,
> >> this example is not working and toString on the stream iterator returns
> >>the
> >> words "empty iterator".  Below, run2() method is the run method from the
> >> source code, THAT WORKS.  The run() method below is from the Consumer
> >>Group
> >> Example and DOES NOT WORK.
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
> >>
> >> It simply prints messages like
> >>
> >> Created iterator empty iterator thread number 9
> >> Created iterator empty iterator thread number 1
> >> Shutting down Thread: 1
> >> Created iterator empty iterator thread number 3
> >>
> >> And continues doing so as I produce message using the console producer
> >>and
> >> does not print messages.
> >>
> >>
> >>
> >>
> >> Im not sure if this is a versioning issue, or what might be the cause.
> >> But help is appreciated!
> >>
> >>
> >>
> >> Here is the Consumer class:
> >>
> >> import kafka.consumer.KafkaStream;
> >> import kafka.consumer.ConsumerIterator;
> >>
> >> public class Consumer implements Runnable {
> >>
> >>     private KafkaStream kafkaStream;
> >>     private Integer threadNumber;
> >>
> >>     public Consumer(KafkaStream kafkaStream, Integer threadNumber) {
> >>         this.threadNumber = threadNumber;
> >>         this.kafkaStream = kafkaStream;
> >>     }
> >>
> >>     public void run() {
> >>         ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
> >>         System.out.println("Created iterator " + it.toString() + "
> >>thread
> >> number " + threadNumber);
> >>         while(it.hasNext()) {
> >>             System.out.println("Thread " + threadNumber + ": " + new
> >> String(it.next().message()));
> >>
> >>             // validate
> >>             // enrich
> >>             // dispatch
> >>         }
> >>         System.out.println("Shutting down Thread: " + threadNumber);
> >>     }
> >>
> >> }
> >>
> >>
> >>
> >>
> >> In my ConsumerThreadPool class:
> >>
> >>
> >> public class ConsumerThreadPool {
> >>
> >>     private final ConsumerConnector consumer;
> >>     private final String topic;
> >>
> >>     private ExecutorService executor;
> >>     private static ApplicationContext context = new
> >> AnnotationConfigApplicationContext(AppConfig.class);
> >>
> >>     public ConsumerThreadPool(String topic) {
> >>         consumer =
> >>
> >>kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)conte
> >>xt.getBean("consumerConfig"));
> >>         this.topic = topic;
> >>     }
> >>
> >>     public void shutdown() {
> >>         if (consumer != null) consumer.shutdown();
> >>         if (executor != null) executor.shutdown();
> >>     }
> >>
> >>     public void run(Integer numThreads) {
> >>         Map<String, Integer> topicCountMap = new HashMap<String,
> >> Integer>();
> >>
> >>         topicCountMap.put(topic, new Integer(numThreads));
> >>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap
=
> >> consumer.createMessageStreams(topicCountMap);
> >>         List<KafkaStream<byte[], byte[]>> streams =
> >>consumerMap.get(topic);
> >>
> >>         // create threads
> >>         executor = Executors.newFixedThreadPool(numThreads);
> >>
> >>         // now create an object to consume the messages
> >>         Integer threadNumber = 0;
> >>         for(KafkaStream<byte[], byte[]> stream : streams) {
> >>             executor.submit(new Consumer(stream, threadNumber));
> >>             threadNumber++;
> >>         }
> >>     }
> >>
> >>
> >>     public void run2() {
> >>         Map<String, Integer> topicCountMap = new HashMap<String,
> >> Integer>();
> >>
> >>         topicCountMap.put(topic, new Integer(1));
> >>
> >>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap
=
> >> consumer.createMessageStreams(topicCountMap);
> >>
> >>         KafkaStream<byte[], byte[]> stream =
> >>  consumerMap.get(topic).get(0);
> >>         ConsumerIterator<byte[], byte[]> it = stream.iterator();
> >>         while(true) {
> >>             try {
> >>                 Thread.sleep(1000);
> >>             } catch (InterruptedException e) {
> >>                 e.printStackTrace();
> >>             }
> >>             while(it.hasNext()){
> >>                 System.out.println(new String(it.next().message()));
> >>
> >>             }
> >>         }
> >>     }
> >>
> >> }
> >>
> >>
> >>
> >> The AppConfig is pretty simple:
> >>
> >> @Configuration
> >> @ComponentScan("com.truecar.inventory.worker.core")
> >> public class AppConfig {
> >>
> >>     @Bean
> >>     @Named("sharedProducerConsumerConfig")
> >>     private static Properties sharedProducerConsumerConfig() {
> >>         Properties properties = new Properties();
> >>         properties.put("zookeeper.connect", "127.0.0.1:2181");
> >>         properties.put("group.id", "intelligence");
> >>         properties.put("zookeeper.session.timeout.ms", "400");
> >>         properties.put("zookeeper.sync.time.ms", "200");
> >>         properties.put("auto.commit.interval.ms", "1000");
> >>         return properties;
> >>     }
> >>
> >>     @Bean
> >>     @Named("consumerConfig")
> >>     private static ConsumerConfig consumerConfig() {
> >>         Properties properties = sharedProducerConsumerConfig();
> >>         return new ConsumerConfig(properties);
> >>     }
> >>
> >>     @Bean
> >>     @Named("producerConfig")
> >>     private static ProducerConfig producerConfig() {
> >>         Properties properties = sharedProducerConsumerConfig();
> >>         properties.put("serializer.class",
> >> "kafka.serializer.StringEncoder");
> >>         properties.put("metadata.broker.list", "localhost:9092");
> >>         return new ProducerConfig(properties);
> >>     }
> >>
> >> }
> >>
> >>
> >> --
> >>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message