kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Creating a Consumer Thread Pool
Date Wed, 28 Aug 2013 15:24:41 GMT
Could you remove the following statement and see if it works?

System.out.println("Created iterator " + it.toString() + " thread number "
+ threadNumber);

Thanks,

Jun


On Tue, Aug 27, 2013 at 3:43 PM, David Williams <dwilliams@truecar.com>wrote:

>
> Hi all,
>
> I checked out the java source and looked at the java examples.  They
> worked well in my IDE and on the console.  However, I also tried the
> threaded example following the consumer group example.  The problem is,
> this example is not working and toString on the stream iterator returns the
> words "empty iterator".  Below, run2() method is the run method from the
> source code, THAT WORKS.  The run() method below is from the Consumer Group
> Example and DOES NOT WORK.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
>
> It simply prints messages like
>
> Created iterator empty iterator thread number 9
> Created iterator empty iterator thread number 1
> Shutting down Thread: 1
> Created iterator empty iterator thread number 3
>
> And continues doing so as I produce message using the console producer and
> does not print messages.
>
>
>
>
> Im not sure if this is a versioning issue, or what might be the cause.
> But help is appreciated!
>
>
>
> Here is the Consumer class:
>
> import kafka.consumer.KafkaStream;
> import kafka.consumer.ConsumerIterator;
>
> public class Consumer implements Runnable {
>
>     private KafkaStream kafkaStream;
>     private Integer threadNumber;
>
>     public Consumer(KafkaStream kafkaStream, Integer threadNumber) {
>         this.threadNumber = threadNumber;
>         this.kafkaStream = kafkaStream;
>     }
>
>     public void run() {
>         ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
>         System.out.println("Created iterator " + it.toString() + " thread
> number " + threadNumber);
>         while(it.hasNext()) {
>             System.out.println("Thread " + threadNumber + ": " + new
> String(it.next().message()));
>
>             // validate
>             // enrich
>             // dispatch
>         }
>         System.out.println("Shutting down Thread: " + threadNumber);
>     }
>
> }
>
>
>
>
> In my ConsumerThreadPool class:
>
>
> public class ConsumerThreadPool {
>
>     private final ConsumerConnector consumer;
>     private final String topic;
>
>     private ExecutorService executor;
>     private static ApplicationContext context = new
> AnnotationConfigApplicationContext(AppConfig.class);
>
>     public ConsumerThreadPool(String topic) {
>         consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)context.getBean("consumerConfig"));
>         this.topic = topic;
>     }
>
>     public void shutdown() {
>         if (consumer != null) consumer.shutdown();
>         if (executor != null) executor.shutdown();
>     }
>
>     public void run(Integer numThreads) {
>         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>
>         topicCountMap.put(topic, new Integer(numThreads));
>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap
=
> consumer.createMessageStreams(topicCountMap);
>         List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
>
>         // create threads
>         executor = Executors.newFixedThreadPool(numThreads);
>
>         // now create an object to consume the messages
>         Integer threadNumber = 0;
>         for(KafkaStream<byte[], byte[]> stream : streams) {
>             executor.submit(new Consumer(stream, threadNumber));
>             threadNumber++;
>         }
>     }
>
>
>     public void run2() {
>         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>
>         topicCountMap.put(topic, new Integer(1));
>
>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap
=
> consumer.createMessageStreams(topicCountMap);
>
>         KafkaStream<byte[], byte[]> stream =
>  consumerMap.get(topic).get(0);
>         ConsumerIterator<byte[], byte[]> it = stream.iterator();
>         while(true) {
>             try {
>                 Thread.sleep(1000);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>             while(it.hasNext()){
>                 System.out.println(new String(it.next().message()));
>
>             }
>         }
>     }
>
> }
>
>
>
> The AppConfig is pretty simple:
>
> @Configuration
> @ComponentScan("com.truecar.inventory.worker.core")
> public class AppConfig {
>
>     @Bean
>     @Named("sharedProducerConsumerConfig")
>     private static Properties sharedProducerConsumerConfig() {
>         Properties properties = new Properties();
>         properties.put("zookeeper.connect", "127.0.0.1:2181");
>         properties.put("group.id", "intelligence");
>         properties.put("zookeeper.session.timeout.ms", "400");
>         properties.put("zookeeper.sync.time.ms", "200");
>         properties.put("auto.commit.interval.ms", "1000");
>         return properties;
>     }
>
>     @Bean
>     @Named("consumerConfig")
>     private static ConsumerConfig consumerConfig() {
>         Properties properties = sharedProducerConsumerConfig();
>         return new ConsumerConfig(properties);
>     }
>
>     @Bean
>     @Named("producerConfig")
>     private static ProducerConfig producerConfig() {
>         Properties properties = sharedProducerConsumerConfig();
>         properties.put("serializer.class",
> "kafka.serializer.StringEncoder");
>         properties.put("metadata.broker.list", "localhost:9092");
>         return new ProducerConfig(properties);
>     }
>
> }
>
>
> --
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message