zookeeper-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "vineet.salian" <salianvin...@gmail.com>
Subject Re: Unusual exception
Date Wed, 23 Mar 2016 06:33:14 GMT
Hi Avinash,
I am facing the same problem. 
I am unable to detect the source of the problem as the producer program is
working but consumer is not.
Can you please help me out.

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
public class HelloKafkaConsumer extends  Thread {
    final static String clientId = "SimpleConsumerDemoClient";
    final static String TOPIC = "test";
    ConsumerConnector consumerConnector;


    public static void main(String[] argv) throws
UnsupportedEncodingException {
        HelloKafkaConsumer helloKafkaConsumer = new HelloKafkaConsumer();
        helloKafkaConsumer.start();
    }

    public HelloKafkaConsumer(){
    	try{
        Properties properties = new Properties();
        properties.put("zookeeper.connect","localhost:2181");
        System.out.println("Hi"+properties);
        properties.put("group.id","test-group");
        System.out.println("Hi1"+properties);
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        System.out.println("hi2"+consumerConfig);
        consumerConnector =
Consumer.createJavaConsumerConnector(consumerConfig);
    	System.out.println("hi3"+consumerConnector);
        }catch(Exception e){}
    }

    @Override
    public void run() {
    	try{
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
System.out.println("topic count map "+topicCountMap);
        topicCountMap.put(TOPIC, new Integer(1));
        Map<String, List&lt;KafkaStream&lt;byte[], byte[]>>> consumerMap
=
consumerConnector.createMessageStreams(topicCountMap);
        System.out.println("------------------------->");
        KafkaStream<byte[], byte[]> stream =  consumerMap.get(TOPIC).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while(it.hasNext())
            System.out.println(new String(it.next().message()));
    	}catch(Exception e){ System.out.print("Inside run"+e);
e.printStackTrace(); }
    }

    private static void printMessages(ByteBufferMessageSet messageSet)
throws UnsupportedEncodingException {
        	for(MessageAndOffset messageAndOffset: messageSet) 
        	{
        		try{
        		ByteBuffer payload = messageAndOffset.message().payload();
        		byte[] bytes = new byte[payload.limit()];
        		payload.get(bytes);
        		System.out.println(new String(bytes, "UTF-8"));
        		}catch(Exception e){}
        	}
    }
}

This is the consumer program.

Regards,
Vineet



--
View this message in context: http://zookeeper-user.578899.n2.nabble.com/Unusual-exception-tp5632833p7582198.html
Sent from the zookeeper-user mailing list archive at Nabble.com.

Mime
View raw message