Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EA2EB200CA9 for ; Fri, 16 Jun 2017 11:56:03 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E8BF1160BDD; Fri, 16 Jun 2017 09:56:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 20357160BD2 for ; Fri, 16 Jun 2017 11:56:01 +0200 (CEST) Received: (qmail 10579 invoked by uid 500); 16 Jun 2017 09:56:00 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 10565 invoked by uid 99); 16 Jun 2017 09:56:00 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Jun 2017 09:55:59 +0000 Received: from aljoschas-mbp.fritz.box (dslb-084-059-068-070.084.059.pools.vodafone-ip.de [84.59.68.70]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 63E9D1A0048; Fri, 16 Jun 2017 09:55:58 +0000 (UTC) From: Aljoscha Krettek Message-Id: <785F6575-BB2C-45B3-8182-C116119DEA25@apache.org> Content-Type: multipart/alternative; boundary="Apple-Mail=_0CD6FA1B-83B4-46CD-865D-802201C61234" Mime-Version: 1.0 (Mac OS X Mail 10.3 \(3273\)) Subject: Re: Stateful streaming question Date: Fri, 16 Jun 2017 11:55:57 +0200 In-Reply-To: Cc: Kostas Kloudas , =?utf-8?Q?Fabian_H=C3=BCske?= , "Jain, Ankit" , user To: Flavio Pompermaier References: <6F7079DF-2FDF-4A98-90EF-A1DCFEA4D033@here.com> <6C4F2255-97BF-42B1-ACCF-773DD9F95917@data-artisans.com> <890E8BBF-A488-44EC-822B-E0DCA250D0E6@apache.org> X-Mailer: Apple Mail (2.3273) archived-at: Fri, 16 Jun 2017 09:56:04 -0000 --Apple-Mail=_0CD6FA1B-83B4-46CD-865D-802201C61234 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 I think it might be possible to do but I=E2=80=99m not aware of anyone = working on that and I haven=E2=80=99t seen anyone on the mailing lists = express interest in working on that. > On 16. Jun 2017, at 11:31, Flavio Pompermaier = wrote: >=20 > Ok thanks for the clarification. Do you think it could be possible = (sooner or later) to have in Flink some sort of synchronization between = jobs (as in this case where the input datastream should be "paused" = until the second job finishes)? I know I coould use something like Oozie = or Falcon to orchestrate jobs but I'd prefer to avoid to add them to our = architecture.. >=20 > Best, > Flavio >=20 > On Fri, Jun 16, 2017 at 11:23 AM, Aljoscha Krettek = > wrote: > Hi, >=20 > I=E2=80=99m afraid not. You would have to wait for one job to finish = before starting the next one. >=20 > Best, > Aljoscha >> On 15. Jun 2017, at 20:11, Flavio Pompermaier > wrote: >>=20 >> Hi Aljoscha, >> we're still investigating possible solutions here. Yes, as you = correctly said there are links between data of different keys so we can = only proceed with the next job only once we are sure at 100% that all = input data has been consumed and no other data will be read until this = last jobs ends. >> There should be some sort of synchronization between these 2 = jobs...is that possible right now in Flink? >>=20 >> Thanks a lot for the support, >> Flavio >>=20 >> On Thu, Jun 15, 2017 at 12:16 PM, Aljoscha Krettek = > wrote: >> Hi, >>=20 >> Trying to revive this somewhat older thread: have you made any = progress? I think going with a ProcessFunction that keeps all your state = internally and periodically outputs to, say, Elasticsearch using a sink = seems like the way to go? You can do the periodic emission using timers = in the ProcessFunction.=20 >>=20 >> In your use case, does the data you would store in the Flink managed = state have links between data of different keys? This sounds like it = could be a problem when it comes to consistency when outputting to an = external system. >>=20 >> Best, >> Aljoscha >>=20 >>> On 17. May 2017, at 14:12, Flavio Pompermaier > wrote: >>>=20 >>> Hi to all, >>> there are a lot of useful discussion points :) >>>=20 >>> I'll try to answer to everybody. >>>=20 >>> @Ankit:=20 >>> right now we're using Parquet on HDFS to store thrift objects. Those = objects are essentially structured like >>> key >>> alternative_key >>> list of tuples (representing the state of my Object) >>> This model could be potentially modeled as a Monoid and it's very = well suited for a stateful streaming computation where updates to a = single key state are not as expansive as a call to any db to get the = current list of tuples and update back that list with for an update = (IMHO). Maybe here I'm overestimating Flink streaming capabilities... >>> serialization should be ok using thrift, but Flink advice to use = tuples to have better performance so just after reading the data from = disk (as a ThriftObject) we convert them to its equivalent = representation as Tuple3> version >>> Since I currently use Flink to ingest data that (in the end) means = adding tuples to my objects, it would be perfect to have an "online" = state of the grouped tuples in order to: >>> add/remove tuples to my object very quickly >>> from time to time, scan the whole online data (or a part of it) and = "translate" it into one ore more JSON indices (and put them into = Elasticsearch) >>> @Fabian: >>> You're right that batch processes are bot very well suited to work = with services that can fail...if in a map function the remote call fails = all the batch job fails...this should be less problematic with streaming = because there's checkpointing and with async IO is should be the = possibile to add some retry/backoff policies in order to not overload = remote services like db or solr/es indices (maybe it's not already there = but it should be possible to add). Am I wrong? >>>=20 >>> @Kostas: >>>=20 >>> =46rom what I understood Queryable state is usefult for gets...what = if I need to scan the entire db? For us it could be better do = periodically dump the state to RocksDb or HDFS but, as I already said, = I'm not sure if it is safe to start a batch job that reads the dumped = data while, in the meantime, a possible update of this dump could = happen...is there any potential problem to data consistency (indeed = tuples within grouped objects have references to other objects keys)? >>>=20 >>> Best, >>> Flavio >>>=20 >>> On Wed, May 17, 2017 at 10:18 AM, Kostas Kloudas = > = wrote: >>> Hi Flavio, >>>=20 >>> For setting the retries, unfortunately there is no such setting yet = and, if I am not wrong, in case of a failure of a request,=20 >>> an exception will be thrown and the job will restart. I am also = including Till in the thread as he may know better. >>>=20 >>> For consistency guarantees and concurrency control, this depends on = your underlying backend. But if you want to=20 >>> have end-to-end control, then you could do as Ankit suggested at his = point 3), i.e have a single job for the whole pipeline >>> (if this fits your needs of course). This will allow you to set = your own =E2=80=9Cprecedence=E2=80=9D rules for your operations. >>>=20 >>> Now finally, there is no way currently to expose the state of a job = to another job. The way to do so is either Queryable >>> State, or writing to a Sink. If the problem for having one job is = that you emit one element at a time, you can always group >>> elements together and emit downstream less often, in batches. >>> =20 >>> Finally, if you need 2 jobs, you can always use a hybrid solution = where you keep your current state in Flink, and you dump it=20 >>> to a Sink that is queryable once per week for example. The Sink then = can be queried at any time, and data will be at most one=20 >>> week old. >>>=20 >>> Thanks, >>> Kostas >>>=20 >>>> On May 17, 2017, at 9:35 AM, Fabian Hueske > wrote: >>>>=20 >>>> Hi Ankit, just a brief comment on the batch job is easier than = streaming job argument. I'm not sure about that.=20 >>>> I can see that just the batch job might seem easier to implement, = but this is only one part of the whole story. The operational side of = using batch is more complex IMO.=20 >>>> You need a tool to ingest your stream, you need storage for the = ingested data, you need a periodic scheduler to kick of your batch job, = and you need to take care of failures if something goes wrong.=20 >>>> The streaming case, this is not needed or the framework does it for = you. >>>>=20 >>>> Just my 2 cents, Fabian >>>>=20 >>>> 2017-05-16 20:58 GMT+02:00 Jain, Ankit >: >>>> Hi Flavio, >>>>=20 >>>> While you wait on an update from Kostas, wanted to understand the = use-case better and share my thoughts- >>>>=20 >>>> =20 >>>>=20 >>>> 1) Why is current batch mode expensive? Where are you = persisting the data after updates? Way I see it by moving to Flink, you = get to use RocksDB(a key-value store) that makes your lookups faster =E2=80= =93 probably right now you are using a non-indexed store like S3 maybe? >>>>=20 >>>> So, gain is coming from moving to a better persistence store suited = to your use-case than from batch->streaming. Myabe consider just going = with a different data store. >>>>=20 >>>> IMHO, stream should only be used if you really want to act on the = new events in real-time. It is generally harder to get a streaming job = correct than a batch one. >>>>=20 >>>> =20 >>>>=20 >>>> 2) If current setup is expensive due to = serialization-deserialization then that should be fixed by moving to a = faster format (maybe AVRO? - I don=E2=80=99t have a lot of expertise in = that). I don=E2=80=99t see how that problem will go away with Flink =E2=80= =93 so still need to handle serialization. >>>>=20 >>>> =20 >>>>=20 >>>> 3) Even if you do decide to move to Flink =E2=80=93 I think = you can do this with one job, two jobs are not needed. At every incoming = event, check the previous state and update/output to kafka or whatever = data store you are using. >>>>=20 >>>> =20 >>>>=20 >>>> =20 >>>>=20 >>>> Thanks >>>>=20 >>>> Ankit >>>>=20 >>>> =20 >>>>=20 >>>> From: Flavio Pompermaier > >>>> Date: Tuesday, May 16, 2017 at 9:31 AM >>>> To: Kostas Kloudas > >>>> Cc: user > >>>> Subject: Re: Stateful streaming question >>>>=20 >>>> =20 >>>>=20 >>>> Hi Kostas, >>>>=20 >>>> thanks for your quick response.=20 >>>>=20 >>>> I also thought about using Async IO, I just need to figure out how = to correctly handle parallelism and number of async requests.=20 >>>>=20 >>>> However that's probably the way to go..is it possible also to set a = number of retry attempts/backoff when the async request fails (maybe due = to a too busy server)? >>>>=20 >>>> =20 >>>>=20 >>>> For the second part I think it's ok to persist the state into = RocksDB or HDFS, my question is indeed about that: is it safe to start = reading (with another Flink job) from RocksDB or HDFS having an = updatable state "pending" on it? Should I ensure that state updates are = not possible until the other Flink job hasn't finish to read the = persisted data? >>>>=20 >>>> =20 >>>>=20 >>>> And another question...I've tried to draft such a processand = basically I have the following code: >>>>=20 >>>> =20 >>>>=20 >>>> DataStream groupedObj =3D tuples.keyBy(0) >>>>=20 >>>> .flatMap(new RichFlatMapFunction() { >>>>=20 >>>> =20 >>>>=20 >>>> private transient ValueState state; >>>>=20 >>>> =20 >>>>=20 >>>> @Override >>>>=20 >>>> public void flatMap(Tuple4 t, Collector = out) throws Exception { >>>>=20 >>>> MyGroupedObj current =3D state.value(); >>>>=20 >>>> if (current =3D=3D null) { >>>>=20 >>>> current =3D new MyGroupedObj(); >>>>=20 >>>> } >>>>=20 >>>> .... >>>>=20 >>>> current.addTuple(t); >>>>=20 >>>> ...=20 >>>>=20 >>>> state.update(current); >>>>=20 >>>> out.collect(current); >>>>=20 >>>> } >>>>=20 >>>> =20 >>>>=20 >>>> @Override >>>>=20 >>>> public void open(Configuration config) { >>>>=20 >>>> ValueStateDescriptor descriptor =3D >>>>=20 >>>> new ValueStateDescriptor<>( = "test",TypeInformation.of(MyGroupedObj.class)); >>>>=20 >>>> state =3D getRuntimeContext().getState(descriptor); >>>>=20 >>>> } >>>>=20 >>>> }); >>>>=20 >>>> groupedObj.print(); >>>>=20 >>>> =20 >>>>=20 >>>> but obviously this way I emit the updated object on every update = while, actually, I just want to persist the ValueState somehow (and make = it available to another job that runs one/moth for example). Is that = possible? >>>>=20 >>>> =20 >>>>=20 >>>> =20 >>>>=20 >>>> On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas = > = wrote: >>>>=20 >>>> Hi Flavio, >>>>=20 >>>> =20 >>>>=20 >>>> =46rom what I understand, for the first part you are correct. You = can use Flink=E2=80=99s internal state to keep your enriched data. >>>>=20 >>>> In fact, if you are also querying an external system to enrich your = data, it is worth looking at the AsyncIO feature: >>>>=20 >>>> =20 >>>>=20 >>>> = https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asy= ncio.html = >>>> =20 >>>>=20 >>>> Now for the second part, currently in Flink you cannot iterate over = all registered keys for which you have state. A pointer=20 >>>>=20 >>>> to look at the may be useful is the queryable state: >>>>=20 >>>> =20 >>>>=20 >>>> = https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/que= ryable_state.html = >>>> =20 >>>>=20 >>>> This is still an experimental feature, but let us know your opinion = if you use it. >>>>=20 >>>> =20 >>>>=20 >>>> Finally, an alternative would be to keep state in Flink, and = periodically flush it to an external storage system, which you can >>>>=20 >>>> query at will. >>>>=20 >>>> =20 >>>>=20 >>>> Thanks, >>>>=20 >>>> Kostas >>>>=20 >>>> =20 >>>>=20 >>>> =20 >>>>=20 >>>> On May 16, 2017, at 4:38 PM, Flavio Pompermaier = > wrote: >>>>=20 >>>> =20 >>>>=20 >>>> Hi to all, >>>>=20 >>>> we're still playing with Flink streaming part in order to see = whether it can improve our current batch pipeline. >>>>=20 >>>> At the moment, we have a job that translate incoming data (as Row) = into Tuple4, groups them together by the first field and persist the = result to disk (using a thrift object). When we need to add tuples to = those grouped objects we need to read again the persisted data, flat it = back to Tuple4, union with the new tuples, re-group by key and finally = persist. >>>>=20 >>>> =20 >>>>=20 >>>> This is very expansive to do with batch computation while is should = pretty straightforward to do with streaming (from what I understood): I = just need to use ListState. Right? >>>>=20 >>>> Then, let's say I need to scan all the data of the stateful = computation (key and values), in order to do some other computation, I'd = like to know: >>>>=20 >>>> how to do that? I.e. create a DataSet/DataSource from = the stateful data in the stream >>>> is there any problem to access the stateful data without stopping = incoming data (and thus possible updates to the states)? >>>> Thanks in advance for the support, >>>>=20 >>>> Flavio >>>>=20 >>>> =20 >>>>=20 >>>> =20 >>>>=20 >>>>=20 >>>>=20 >>>>=20 >>>> =20 >>>>=20 >>>> -- >>>>=20 >>>> Flavio Pompermaier >>>> Development Department >>>>=20 >>>> OKKAM S.r.l. >>>> Tel. +(39) 0461 1823908 >>>=20 >>>=20 >>=20 >>=20 >=20 >=20 >=20 --Apple-Mail=_0CD6FA1B-83B4-46CD-865D-802201C61234 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 I think it might be possible to do but I=E2=80=99m not aware = of anyone working on that and I haven=E2=80=99t seen anyone on the = mailing lists express interest in working on that.

On = 16. Jun 2017, at 11:31, Flavio Pompermaier <pompermaier@okkam.it> wrote:

Ok thanks for the clarification. Do you think it could be = possible (sooner or later) to have in Flink some sort of synchronization = between jobs (as in this case where the input datastream should be = "paused" until the second job finishes)? I know I coould use something = like Oozie or Falcon to orchestrate jobs but I'd prefer to avoid to add = them to our architecture..

Best,
Flavio

On Fri, = Jun 16, 2017 at 11:23 AM, Aljoscha Krettek <aljoscha@apache.org> wrote:
Hi,

I=E2=80=99m afraid not. You would have = to wait for one job to finish before starting the next one.

Best,
Aljoscha
On 15. Jun 2017, at 20:11, Flavio Pompermaier <pompermaier@okkam.it> wrote:

Hi Aljoscha,
we're = still investigating possible solutions here. Yes, as you correctly said = there are links between data of different keys so we can only proceed = with the next job only once we are sure at 100% that all input data has = been consumed and no other data will be read until this last jobs = ends.
There should be some sort of synchronization = between these 2 jobs...is that possible right now in Flink?

Thanks a lot for the = support,
Flavio

On Thu, Jun 15, 2017 at 12:16 PM, = Aljoscha Krettek <aljoscha@apache.org> wrote:
Hi,

Trying to revive this somewhat older = thread: have you made any progress? I think going with a ProcessFunction = that keeps all your state internally and periodically outputs to, say, = Elasticsearch using a sink seems like the way to go? You can do the = periodic emission using timers in the ProcessFunction. 

In your use case, does = the data you would store in the Flink managed state have links between = data of different keys? This sounds like it could be a problem when it = comes to consistency when outputting to an external system.

Best,
Aljoscha

On 17. = May 2017, at 14:12, Flavio Pompermaier <pompermaier@okkam.it> wrote:

Hi to all,
there are a lot of useful discussion points :)

I'll try to answer to = everybody.

@Ankit: 
  • right now we're using Parquet on HDFS to store thrift = objects. Those objects are essentially structured like
    • key
    • alternative_key
    • list of tuples (representing the state of my Object)
    • This model could be potentially modeled as a Monoid and it's = very well suited for a stateful streaming computation where updates to a = single key state are not as expansive as a call to any db to get the = current list of tuples and update back that list with for an update = (IMHO). Maybe here I'm overestimating Flink streaming = capabilities...
  • serialization should be ok using = thrift, but Flink advice to use tuples to have better performance so = just after reading the data from disk (as a ThriftObject) we convert = them to its equivalent representation as Tuple3<String, String, = List<Tuple4>> version
  • Since I currently use = Flink to ingest data that (in the end) means adding tuples to my = objects, it would be perfect to have an "online" state of the grouped = tuples in order to:
    • add/remove tuples = to my object very quickly
    • from time to time, scan the = whole online data (or a part of it) and "translate" it into one ore more = JSON indices (and put them into Elasticsearch)
@Fabian:
You're right that batch = processes are bot very well suited to work with services that can = fail...if in a map function the remote call fails all the batch job = fails...this should be less problematic with streaming because there's = checkpointing and with async IO  is should be the possibile to add = some retry/backoff policies in order to not overload remote services = like db or solr/es indices (maybe it's not already there but it should = be possible to add). Am I wrong?

@Kostas:

=46rom what I understood Queryable state is usefult for = gets...what if I need to scan the entire db? For us it could be better = do periodically dump the state to RocksDb or HDFS but, as I already = said, I'm not sure if it is safe to start a batch job that reads the = dumped data while, in the meantime, a possible update of this dump could = happen...is there any potential problem to data consistency (indeed = tuples within grouped objects have references to other objects = keys)?

Best,
Flavio

On Wed, = May 17, 2017 at 10:18 AM, Kostas Kloudas <k.kloudas@data-artisans.com> = wrote:
Hi Flavio,

For setting the retries, unfortunately there is no such = setting yet and, if I am not wrong, in case of a failure of a = request, 
an exception will be thrown and the = job will restart. I am also including Till in the thread as he may know = better.

For = consistency guarantees and concurrency control, this depends on your = underlying backend. But if you want to 
have = end-to-end control, then you could do as Ankit suggested at his point = 3), i.e have a single job for the whole pipeline
 (if this fits your needs of course). This will allow = you to set your own =E2=80=9Cprecedence=E2=80=9D rules for your = operations.

Now = finally, there is no way currently to expose the state of a job to = another job. The way to do so is either Queryable
State, or writing to a Sink. If the problem for having one = job is that you emit one element at a time, you can always = group
elements together and emit downstream less = often, in batches.
 
Finally, if  you need 2 jobs, you can always use a = hybrid solution where you keep your current state in Flink, and you dump = it 
to a Sink that is queryable once per week = for example. The Sink then can be queried at any time, and data will be = at most one 
week old.

Thanks,
Kostas

On May 17, 2017, at 9:35 AM, = Fabian Hueske <fhueske@gmail.com> wrote:

Hi Ankit, just a brief comment on = the batch job is easier than streaming job argument. I'm not sure about = that.
I can see that just the batch job might seem easier = to implement, but this is only one part of the whole story. The = operational side of using batch is more complex IMO.
You = need a tool to ingest your stream, you need storage for the ingested = data, you need a periodic scheduler to kick of your batch job, and you = need to take care of failures if something goes wrong.
The = streaming case, this is not needed or the framework does it for you.

Just my 2 cents, Fabian

2017-05-16 20:58 GMT+02:00 Jain, Ankit <ankit.jain@here.com>:

Hi Flavio,

While you wait on an update from Kostas, wanted to understand = the use-case better and share my thoughts-

 

1)       Why is current = batch mode expensive? Where are you persisting the data after updates? = Way I see it by moving to Flink, you get to use RocksDB(a key-value = store) that makes your lookups faster =E2=80=93 probably right now you are using a non-indexed = store like S3 maybe?

So, gain is = coming from moving to a better persistence store suited to your use-case = than from batch->streaming. Myabe consider just going with a = different data store.

IMHO, stream = should only be used if you really want to act on the new events in = real-time. It is generally harder to get a streaming job correct than a = batch one.

 

2)       If current setup = is expensive due to serialization-deserialization then that should be = fixed by moving to a faster format (maybe AVRO? - I don=E2=80=99t have a = lot of expertise in that). I don=E2=80=99t see how that problem will go away with Flink =E2=80=93 = so still need to handle serialization.

 

3)       Even if you do = decide to move to Flink =E2=80=93 I think you can do this with one job, = two jobs are not needed. At every incoming event, check the previous = state and update/output to kafka or whatever data store you are using.

 

 

Thanks

Ankit

 

From: Flavio Pompermaier = <pompermaier@okkam.it>
Date: Tuesday, May 16, 2017 at 9:31 AM
To: Kostas Kloudas <k.kloudas@data-artisans.com>
Cc: user <user@flink.apache.org>
Subject: Re: Stateful streaming question

 

Hi Kostas,

thanks for your quick = response. 

I also thought about using Async = IO, I just need to figure out how to correctly handle parallelism and = number of async requests. 

However that's probably the way = to go..is it possible also to set a number of retry attempts/backoff = when the async request fails (maybe due to a too busy server)?

 

For the second part I think it's = ok to persist the state into RocksDB or HDFS, my question is indeed = about that: is it safe to start reading (with another Flink job) from = RocksDB or HDFS having an updatable state "pending" on it? Should I ensure that state updates are not possible until the other Flink job = hasn't finish to read the persisted data?

 

And another question...I've tried = to draft such a processand basically I have the following code:

 

DataStream<MyGroupedObj> = groupedObj =3D tuples.keyBy(0)

        = .flatMap(new RichFlatMapFunction<Tuple4, MyGroupedObj>() {

 

        =   private transient ValueState<MyGroupedObj> state;

 

        =   @Override

        =   public void flatMap(Tuple4 t, Collector<MyGroupedObj> = out) throws Exception {

        =     MyGroupedObj current =3D state.value();

        =     if (current =3D=3D null) {

        =       current =3D new MyGroupedObj();

        =     }

        =     ....

        =    current.addTuple(t);

        =     ... 

        =     state.update(current);

        =     out.collect(current);

        =   }

        =   

        =   @Override

        =   public void open(Configuration config) {

        =     ValueStateDescriptor<MyGroupedObj> = descriptor =3D

        =               new = ValueStateDescriptor<>( "test",TypeInformation.of(MyGroupedObj.class));

        =       state =3D getRuntimeContext().getState(descriptor);

        =   }

        });

    = groupedObj.print();

 

but obviously this way I emit the = updated object on every update while, actually, I just want to persist = the ValueState somehow (and make it available to another job that runs = one/moth for example). Is that possible?

 

 

On Tue, May 16, 2017 at 5:57 PM, = Kostas Kloudas <k.kloudas@data-artisans.com> = wrote:

Hi Flavio,

 

=46rom what I understand, for the = first part you are correct. You can use Flink=E2=80=99s internal state = to keep your enriched data.

In fact, if you are also querying = an external system to enrich your data, it is worth looking at the = AsyncIO feature:

 

 

Now for the second part, = currently in Flink you cannot iterate over all registered keys for which = you have state. A pointer 

to look at the may be useful is = the queryable state:

 

 

This is still an experimental = feature, but let us know your opinion if you use it.

 

Finally, an alternative would be = to keep state in Flink, and periodically flush it to an external storage = system, which you can

query at will.

 

Thanks,

Kostas

 

 

On May 16, 2017, at 4:38 PM, = Flavio Pompermaier <pompermaier@okkam.it> wrote:

 

Hi to all,

we're still playing with Flink = streaming part in order to see whether it can improve our current batch = pipeline.

At the moment, we have a job that = translate incoming data (as Row) into Tuple4, groups them together by = the first field and persist the result to disk (using a thrift object). = When we need to add tuples to those grouped objects we need to read again the persisted data, flat it back to Tuple4, union with = the new tuples, re-group by key and finally persist.

 

This is very expansive to do with = batch computation while is should pretty straightforward to do with = streaming (from what I understood): I just need to use ListState. = Right?

Then, let's say I need to scan = all the data of the stateful computation (key and values), in order to = do some other computation, I'd like to know:

  • how to do that? I.e. create a DataSet/DataSource<Key,Value> from = the stateful data in the stream
  • is there any problem to access the stateful data without stopping = incoming data (and thus possible updates to the states)?

Thanks in advance for the = support,

Flavio

 

 



 

--

Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 1823908










= --Apple-Mail=_0CD6FA1B-83B4-46CD-865D-802201C61234--