ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Humphrey <hmmlo...@gmail.com>
Subject Re: Kindly tell me where to find these jar files.
Date Sun, 14 May 2017 17:08:37 GMT
Here is some code that I used. I made the Kafka Ignite Streamer as a service
in Ignite and deploy it as a cluster singleton.

public class IgniteKafkaStreamerService implements Service {

	private static final long serialVersionUID = 1L;
	
	@IgniteInstanceResource
	private Ignite ignite;
	private KafkaStreamer<String, String> kafkaStreamer = new
KafkaStreamer<>();
	private IgniteLogger logger;

	@Override
	public void init(ServiceContext ctx) throws Exception {
		logger = ignite.log();
		IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(CACHE_NAME);
		stmr.allowOverwrite(true);
		stmr.autoFlushFrequency(1000);
		
		kafkaStreamer.setIgnite(ignite);
		kafkaStreamer.setStreamer(stmr);
		kafkaStreamer.setThreads(4);
		kafkaStreamer.setTopic(KAFKA_TOPIC);
		Properties kafkaProps = Util.loadProperties("config/kafka.properties"); //
Some code to read in the kafka properties
		kafkaStreamer.setConsumerConfig(new ConsumerConfig(kafkaProps));
		kafkaStreamer.setSingleTupleExtractor(msg -> new
AbstractMap.SimpleEntry<String, String>(new String(msg.key()), new
String(msg.message())));
	}

	@Override
	public void execute(ServiceContext ctx) throws Exception {
		kafkaStreamer.start();
		logger.info("KafkaStreamer started.");
	}
	
	@Override
	public void cancel(ServiceContext ctx) {
		kafkaStreamer.stop();
		logger.info("KafkaStreamer stopped.");
	}

}

Below the code to startup:

public class IgniteNodeStartup {

	public static void main(String[] args) {
		// Use to start up an Ignite server with default configuration
		Ignite ignite = Ignition.start();
		ignite.getOrCreateCache(getCacheConfiguration());
		// Deploy data streamer service on the server nodes.
		ClusterGroup forServers = ignite.cluster().forServers();
		ignite.services(forServers).deployClusterSingleton("KafkaService", new
IgniteKafkaStreamerService());
	}
	
	private static CacheConfiguration<String, String> getCacheConfiguration() {
		CacheConfiguration<String, String> cfg = new CacheConfiguration<String,
String>();
		cfg.setName(CACHE_NAME);
		cfg.setBackups(1);
		return cfg;
	}
}

I hope this helps.

Humphrey



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Kindly-tell-me-where-to-find-these-jar-files-tp12649p12836.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Mime
View raw message