ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ilya Kasnacheev <ilya.kasnach...@gmail.com>
Subject Re: Questions on IgniteDataStreamer
Date Tue, 11 Jun 2019 14:50:47 GMT
Hello!

Can you please at least share the exceptions you are getting?

Regards,
-- 
Ilya Kasnacheev


сб, 1 июн. 2019 г. в 13:55, Om Thacker <omi.thacker08@gmail.com>:

> Hello vbm,
>
> I am working on the exact same problem. Did you find the solution for the
> same.
> I am using following code in my client application which will listen to
> kafka connect (confluent).
>
> I have one to one mapping for kafka topic and ignite cache. When there is
> an
> insert into db, the kafka listener listens that and using gson library i am
> converting json to object and the stmr.addData() works fine. But while
> updating the value in db, i am facing marshller error.I tried to use
> cache.put() method ,but it gives me cachewriteexception .
>
>
> @KafkaListener(topics = { "kafka-Users" })
>         public void listenUsers(String message) {
>                 logger.error(message);
>                 ObjectMapper mapper = new ObjectMapper();
>                 JsonNode rootNode;
>                 try {
>                         rootNode = mapper.readTree(message);
>                         Users user = new Users();
>                         IgniteDataStreamer<Long, Users> stmr =
> ignite.dataStreamer(IgniteProperties.USERS_CACHE.getName());
> //                      stmr.allowOverwrite(true);
>
>                         /*
>                          * stmr.receiver(new StreamTransformer<Long,
> Users>() {
>                          *
>                          * @Override public Object
> process(MutableEntry<Long, Users> entry,
> Object...
>                          * arguments) throws EntryProcessorException {
> return null; }
>                          *
>                          * });
>                          */
>
>                         /*
>                          * stmr.receiver(StreamTransformer.from((e, arg)
> -> { Users val =
> e.getValue();
>                          * System.out.println(val+" user from reciever
> $$$$$$$$$"); return null;
> }));
>                          */
>
>                         Gson gson = new
>
> GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.UPPER_CAMEL_CASE).create();
>                         user =
> gson.fromJson(rootNode.get("payload").toString(), Users.class);
>
> stmr.addData(rootNode.get("payload").get("UsersKey").asLong(), user);
>                         stmr.flush(); //
> //                      stmr.allowOverwrite(true);
>                 } catch (Exception e) {
>                         e.printStackTrace();
>                 }
>         }
>
>
>
>
> can you please share your solution for the same.
> Thanks,
> Om Thacker
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Mime
View raw message