ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From blasteralfred <blasteralf...@gmail.com>
Subject How to set cache name in Kafka connect?
Date Wed, 17 May 2017 07:25:40 GMT
Hi,

I am trying to fetch from Kafka topic to Ignite cache. I am a beginner and
created a code for the same. I don't know how to set Cache name. Also,
kindly review my code that is there any mistakes, which will be really
helpful for me to get started with these.

Thanks in advance..

		Ignition.setClientMode(true);

		try (Ignite ignite =
Ignition.start("D:/Applns/apache-ignite-fabric-1.6.0-bin/apache-ignite-fabric-1.6.0-bin/examples/config/example-ignite.xml"))
{

			KafkaStreamer<String, String, String> kafkaStreamer = new
KafkaStreamer<>();
			
			try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(null))
{
				stmr.allowOverwrite(true);
				
				CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>();
				cfg.setName("Democache");
				IgniteCache<Integer, String> cache = ignite.getOrCreateCache(cfg);
				
				kafkaStreamer.setIgnite(ignite);
				kafkaStreamer.setStreamer(stmr);
				kafkaStreamer.setTopic("cachetest3");
				kafkaStreamer.setThreads(4);
				
				Properties settings = new Properties();
				settings.put("bootstrap.servers", "192.168.15.120:9092");
				settings.put("group.id", "test");
				settings.put("zookeeper.connect", "192.168.15.120:2181");
			
settings.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
			
settings.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
			
settings.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
			
settings.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
				
				kafka.consumer.ConsumerConfig config = new ConsumerConfig(settings);
				kafkaStreamer.setConsumerConfig(config);
				
				StringDecoder strDecoder = new StringDecoder(new
VerifiableProperties());
				kafkaStreamer.setKeyDecoder(strDecoder);
				kafkaStreamer.setValueDecoder(strDecoder);
				
				kafkaStreamer.start();
			} finally {
				kafkaStreamer.stop();
			}

		}

IgniteStream.java
<http://apache-ignite-users.70518.x6.nabble.com/file/n12959/IgniteStream.java>  



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/How-to-set-cache-name-in-Kafka-connect-tp12959.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Mime
View raw message