kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From HUI CHEN <ielts0...@gmail.com>
Subject Different Result in Consumer
Date Thu, 24 Jan 2013 13:53:03 GMT
I write the code just same as the code given on kafka website like this:
package com.a2.kafka.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;

public class CommonConsumer {
public static void main(String[] args) {
// specify some consumer properties
Properties props = new Properties();
props.put("zk.connect", "");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "test_group");

// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector =

Map<String, Integer> map=new HashMap<String,Integer>();
map.put("test", 4);
// create 4 partitions of the stream for topic “test”, to allow 4 threads
to consume
Map<String, List<KafkaStream<Message>>> topicMessageStreams =
List<KafkaStream<Message>> streams = topicMessageStreams.get("test");

// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(4);

// consume the messages in the threads
for(final KafkaStream<Message> stream: streams) {
  executor.submit(new Runnable() {
    public void run() {
      for(MessageAndMetadata<Message> msgAndMetadata: stream) {
        // process message (msgAndMetadata.message())

but sometimes the program can get All info from topic ,but sometimes it can
get nothing from the client .
I want to count the total of the log in this topic, what should i write the
thanks very much.

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message