flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Albert Gimenez" <albert.gimenez.mora...@gmail.com>
Subject DataStream of Future
Date Fri, 16 Sep 2016 11:16:05 GMT
Hello,

 

I just started learning Flink (using Scala) recently, and I developed a job
that, in short, does this steps:

 

-          Reads json messages from Kafka

-          Enriches the messages, reading data from Cassandra (using Phantom
DSL)

-          Puts the enriched messages back to another Kafka topic.

 

The job looks like this:

 

env

    .addSource(new FlinkKafkaProducer09[String](...))

    .map(MyService.enrichMessage _) // Returns Option

    .filter(!_.isEmpty)

    .map(_.get)

    .map(enrichedMessageToJsonMapper)

    .addSink(new FlinkKafkaConsumer09[String](...)))

 

The "enrichMessage" method is where I'm using Phantom DSL to query
Cassandra, and I would like to return a Future, but I can't figure out a way
to do it right now, so I'm using "Await" to force the resolution and return
a result. Is there a way to use a Future here?

 

I do have a second job that is updating the data in Cassandra, and since I
don't need to sink, I can have my map to return the Futures, and everything
happens asynchronously. I would like to know if it's possible to have a
similar behavior when I want to use a Sink (so, sink to Kafka as the Futures
are completed).

 

BTW, I'm using Flink 1.1.2 with Scala 2.11.

 

Thanks a lot for your help!

 

Kind regards,

 

Albert



---
This email has been checked for viruses by Avast antivirus software.
https://www.avast.com/antivirus

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message