Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9DEE7200CA8 for ; Thu, 15 Jun 2017 12:16:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9C3AF160BDF; Thu, 15 Jun 2017 10:16:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EBD51160BC4 for ; Thu, 15 Jun 2017 12:16:26 +0200 (CEST) Received: (qmail 34742 invoked by uid 500); 15 Jun 2017 10:16:26 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 34727 invoked by uid 99); 15 Jun 2017 10:16:26 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Jun 2017 10:16:25 +0000 Received: from aljoschas-mbp.fritz.box (dslb-084-059-068-070.084.059.pools.vodafone-ip.de [84.59.68.70]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 6C3EB1A00A8; Thu, 15 Jun 2017 10:16:22 +0000 (UTC) From: Aljoscha Krettek Message-Id: Content-Type: multipart/alternative; boundary="Apple-Mail=_9899BE29-FB1D-4281-B2D6-BA304138AA51" Mime-Version: 1.0 (Mac OS X Mail 10.3 \(3273\)) Subject: Re: Stateful streaming question Date: Thu, 15 Jun 2017 12:16:18 +0200 In-Reply-To: Cc: Kostas Kloudas , =?utf-8?Q?Fabian_H=C3=BCske?= , "Jain, Ankit" , user To: Flavio Pompermaier References: <6F7079DF-2FDF-4A98-90EF-A1DCFEA4D033@here.com> <6C4F2255-97BF-42B1-ACCF-773DD9F95917@data-artisans.com> X-Mailer: Apple Mail (2.3273) archived-at: Thu, 15 Jun 2017 10:16:28 -0000 --Apple-Mail=_9899BE29-FB1D-4281-B2D6-BA304138AA51 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Hi, Trying to revive this somewhat older thread: have you made any progress? = I think going with a ProcessFunction that keeps all your state = internally and periodically outputs to, say, Elasticsearch using a sink = seems like the way to go? You can do the periodic emission using timers = in the ProcessFunction.=20 In your use case, does the data you would store in the Flink managed = state have links between data of different keys? This sounds like it = could be a problem when it comes to consistency when outputting to an = external system. Best, Aljoscha > On 17. May 2017, at 14:12, Flavio Pompermaier = wrote: >=20 > Hi to all, > there are a lot of useful discussion points :) >=20 > I'll try to answer to everybody. >=20 > @Ankit:=20 > right now we're using Parquet on HDFS to store thrift objects. Those = objects are essentially structured like > key > alternative_key > list of tuples (representing the state of my Object) > This model could be potentially modeled as a Monoid and it's very well = suited for a stateful streaming computation where updates to a single = key state are not as expansive as a call to any db to get the current = list of tuples and update back that list with for an update (IMHO). = Maybe here I'm overestimating Flink streaming capabilities... > serialization should be ok using thrift, but Flink advice to use = tuples to have better performance so just after reading the data from = disk (as a ThriftObject) we convert them to its equivalent = representation as Tuple3> version > Since I currently use Flink to ingest data that (in the end) means = adding tuples to my objects, it would be perfect to have an "online" = state of the grouped tuples in order to: > add/remove tuples to my object very quickly > from time to time, scan the whole online data (or a part of it) and = "translate" it into one ore more JSON indices (and put them into = Elasticsearch) > @Fabian: > You're right that batch processes are bot very well suited to work = with services that can fail...if in a map function the remote call fails = all the batch job fails...this should be less problematic with streaming = because there's checkpointing and with async IO is should be the = possibile to add some retry/backoff policies in order to not overload = remote services like db or solr/es indices (maybe it's not already there = but it should be possible to add). Am I wrong? >=20 > @Kostas: >=20 > =46rom what I understood Queryable state is usefult for gets...what if = I need to scan the entire db? For us it could be better do periodically = dump the state to RocksDb or HDFS but, as I already said, I'm not sure = if it is safe to start a batch job that reads the dumped data while, in = the meantime, a possible update of this dump could happen...is there any = potential problem to data consistency (indeed tuples within grouped = objects have references to other objects keys)? >=20 > Best, > Flavio >=20 > On Wed, May 17, 2017 at 10:18 AM, Kostas Kloudas = > = wrote: > Hi Flavio, >=20 > For setting the retries, unfortunately there is no such setting yet = and, if I am not wrong, in case of a failure of a request,=20 > an exception will be thrown and the job will restart. I am also = including Till in the thread as he may know better. >=20 > For consistency guarantees and concurrency control, this depends on = your underlying backend. But if you want to=20 > have end-to-end control, then you could do as Ankit suggested at his = point 3), i.e have a single job for the whole pipeline > (if this fits your needs of course). This will allow you to set your = own =E2=80=9Cprecedence=E2=80=9D rules for your operations. >=20 > Now finally, there is no way currently to expose the state of a job to = another job. The way to do so is either Queryable > State, or writing to a Sink. If the problem for having one job is that = you emit one element at a time, you can always group > elements together and emit downstream less often, in batches. > =20 > Finally, if you need 2 jobs, you can always use a hybrid solution = where you keep your current state in Flink, and you dump it=20 > to a Sink that is queryable once per week for example. The Sink then = can be queried at any time, and data will be at most one=20 > week old. >=20 > Thanks, > Kostas >=20 >> On May 17, 2017, at 9:35 AM, Fabian Hueske > wrote: >>=20 >> Hi Ankit, just a brief comment on the batch job is easier than = streaming job argument. I'm not sure about that.=20 >> I can see that just the batch job might seem easier to implement, but = this is only one part of the whole story. The operational side of using = batch is more complex IMO.=20 >> You need a tool to ingest your stream, you need storage for the = ingested data, you need a periodic scheduler to kick of your batch job, = and you need to take care of failures if something goes wrong.=20 >> The streaming case, this is not needed or the framework does it for = you. >>=20 >> Just my 2 cents, Fabian >>=20 >> 2017-05-16 20:58 GMT+02:00 Jain, Ankit >: >> Hi Flavio, >>=20 >> While you wait on an update from Kostas, wanted to understand the = use-case better and share my thoughts- >>=20 >> =20 >>=20 >> 1) Why is current batch mode expensive? Where are you = persisting the data after updates? Way I see it by moving to Flink, you = get to use RocksDB(a key-value store) that makes your lookups faster =E2=80= =93 probably right now you are using a non-indexed store like S3 maybe? >>=20 >> So, gain is coming from moving to a better persistence store suited = to your use-case than from batch->streaming. Myabe consider just going = with a different data store. >>=20 >> IMHO, stream should only be used if you really want to act on the new = events in real-time. It is generally harder to get a streaming job = correct than a batch one. >>=20 >> =20 >>=20 >> 2) If current setup is expensive due to = serialization-deserialization then that should be fixed by moving to a = faster format (maybe AVRO? - I don=E2=80=99t have a lot of expertise in = that). I don=E2=80=99t see how that problem will go away with Flink =E2=80= =93 so still need to handle serialization. >>=20 >> =20 >>=20 >> 3) Even if you do decide to move to Flink =E2=80=93 I think you = can do this with one job, two jobs are not needed. At every incoming = event, check the previous state and update/output to kafka or whatever = data store you are using. >>=20 >> =20 >>=20 >> =20 >>=20 >> Thanks >>=20 >> Ankit >>=20 >> =20 >>=20 >> From: Flavio Pompermaier > >> Date: Tuesday, May 16, 2017 at 9:31 AM >> To: Kostas Kloudas > >> Cc: user > >> Subject: Re: Stateful streaming question >>=20 >> =20 >>=20 >> Hi Kostas, >>=20 >> thanks for your quick response.=20 >>=20 >> I also thought about using Async IO, I just need to figure out how to = correctly handle parallelism and number of async requests.=20 >>=20 >> However that's probably the way to go..is it possible also to set a = number of retry attempts/backoff when the async request fails (maybe due = to a too busy server)? >>=20 >> =20 >>=20 >> For the second part I think it's ok to persist the state into RocksDB = or HDFS, my question is indeed about that: is it safe to start reading = (with another Flink job) from RocksDB or HDFS having an updatable state = "pending" on it? Should I ensure that state updates are not possible = until the other Flink job hasn't finish to read the persisted data? >>=20 >> =20 >>=20 >> And another question...I've tried to draft such a processand = basically I have the following code: >>=20 >> =20 >>=20 >> DataStream groupedObj =3D tuples.keyBy(0) >>=20 >> .flatMap(new RichFlatMapFunction() { >>=20 >> =20 >>=20 >> private transient ValueState state; >>=20 >> =20 >>=20 >> @Override >>=20 >> public void flatMap(Tuple4 t, Collector out) = throws Exception { >>=20 >> MyGroupedObj current =3D state.value(); >>=20 >> if (current =3D=3D null) { >>=20 >> current =3D new MyGroupedObj(); >>=20 >> } >>=20 >> .... >>=20 >> current.addTuple(t); >>=20 >> ...=20 >>=20 >> state.update(current); >>=20 >> out.collect(current); >>=20 >> } >>=20 >> =20 >>=20 >> @Override >>=20 >> public void open(Configuration config) { >>=20 >> ValueStateDescriptor descriptor =3D >>=20 >> new ValueStateDescriptor<>( = "test",TypeInformation.of(MyGroupedObj.class)); >>=20 >> state =3D getRuntimeContext().getState(descriptor); >>=20 >> } >>=20 >> }); >>=20 >> groupedObj.print(); >>=20 >> =20 >>=20 >> but obviously this way I emit the updated object on every update = while, actually, I just want to persist the ValueState somehow (and make = it available to another job that runs one/moth for example). Is that = possible? >>=20 >> =20 >>=20 >> =20 >>=20 >> On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas = > = wrote: >>=20 >> Hi Flavio, >>=20 >> =20 >>=20 >> =46rom what I understand, for the first part you are correct. You can = use Flink=E2=80=99s internal state to keep your enriched data. >>=20 >> In fact, if you are also querying an external system to enrich your = data, it is worth looking at the AsyncIO feature: >>=20 >> =20 >>=20 >> = https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asy= ncio.html = >> =20 >>=20 >> Now for the second part, currently in Flink you cannot iterate over = all registered keys for which you have state. A pointer=20 >>=20 >> to look at the may be useful is the queryable state: >>=20 >> =20 >>=20 >> = https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/que= ryable_state.html = >> =20 >>=20 >> This is still an experimental feature, but let us know your opinion = if you use it. >>=20 >> =20 >>=20 >> Finally, an alternative would be to keep state in Flink, and = periodically flush it to an external storage system, which you can >>=20 >> query at will. >>=20 >> =20 >>=20 >> Thanks, >>=20 >> Kostas >>=20 >> =20 >>=20 >> =20 >>=20 >> On May 16, 2017, at 4:38 PM, Flavio Pompermaier > wrote: >>=20 >> =20 >>=20 >> Hi to all, >>=20 >> we're still playing with Flink streaming part in order to see whether = it can improve our current batch pipeline. >>=20 >> At the moment, we have a job that translate incoming data (as Row) = into Tuple4, groups them together by the first field and persist the = result to disk (using a thrift object). When we need to add tuples to = those grouped objects we need to read again the persisted data, flat it = back to Tuple4, union with the new tuples, re-group by key and finally = persist. >>=20 >> =20 >>=20 >> This is very expansive to do with batch computation while is should = pretty straightforward to do with streaming (from what I understood): I = just need to use ListState. Right? >>=20 >> Then, let's say I need to scan all the data of the stateful = computation (key and values), in order to do some other computation, I'd = like to know: >>=20 >> how to do that? I.e. create a DataSet/DataSource from the = stateful data in the stream >> is there any problem to access the stateful data without stopping = incoming data (and thus possible updates to the states)? >> Thanks in advance for the support, >>=20 >> Flavio >>=20 >> =20 >>=20 >> =20 >>=20 >>=20 >>=20 >>=20 >> =20 >>=20 >> -- >>=20 >> Flavio Pompermaier >> Development Department >>=20 >> OKKAM S.r.l. >> Tel. +(39) 0461 1823908 >=20 >=20 --Apple-Mail=_9899BE29-FB1D-4281-B2D6-BA304138AA51 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 Hi,

Trying = to revive this somewhat older thread: have you made any progress? I = think going with a ProcessFunction that keeps all your state internally = and periodically outputs to, say, Elasticsearch using a sink seems like = the way to go? You can do the periodic emission using timers in the = ProcessFunction. 

In your use case, does the data you would store in the Flink = managed state have links between data of different keys? This sounds = like it could be a problem when it comes to consistency when outputting = to an external system.

Best,
Aljoscha
On = 17. May 2017, at 14:12, Flavio Pompermaier <pompermaier@okkam.it> wrote:

Hi to all,
there are a lot of useful = discussion points :)

I'll try to answer to everybody.

@Ankit: 
  • right now we're using Parquet = on HDFS to store thrift objects. Those objects are essentially = structured like
    • key
    • alternative_key
    • list of tuples = (representing the state of my Object)
    • This model = could be potentially modeled as a Monoid and it's very well suited for a = stateful streaming computation where updates to a single key state are = not as expansive as a call to any db to get the current list of tuples = and update back that list with for an update (IMHO). Maybe here I'm = overestimating Flink streaming capabilities...
  • serialization should be ok using thrift, but Flink advice to = use tuples to have better performance so just after reading the data = from disk (as a ThriftObject) we convert them to its equivalent = representation as Tuple3<String, String, List<Tuple4>> = version
  • Since I currently use Flink to ingest data = that (in the end) means adding tuples to my objects, it would be perfect = to have an "online" state of the grouped tuples in order to:
    • add/remove tuples to my object very = quickly
    • from time to time, scan the whole online data = (or a part of it) and "translate" it into one ore more JSON indices (and = put them into Elasticsearch)
@Fabian:
You're right that batch = processes are bot very well suited to work with services that can = fail...if in a map function the remote call fails all the batch job = fails...this should be less problematic with streaming because there's = checkpointing and with async IO  is should be the possibile to add = some retry/backoff policies in order to not overload remote services = like db or solr/es indices (maybe it's not already there but it should = be possible to add). Am I wrong?

@Kostas:

=46rom what I understood Queryable state is usefult for = gets...what if I need to scan the entire db? For us it could be better = do periodically dump the state to RocksDb or HDFS but, as I already = said, I'm not sure if it is safe to start a batch job that reads the = dumped data while, in the meantime, a possible update of this dump could = happen...is there any potential problem to data consistency (indeed = tuples within grouped objects have references to other objects = keys)?

Best,
Flavio

On Wed, = May 17, 2017 at 10:18 AM, Kostas Kloudas <k.kloudas@data-artisans.com> = wrote:
Hi Flavio,

For setting the retries, unfortunately there is no such = setting yet and, if I am not wrong, in case of a failure of a = request, 
an exception will be thrown and the = job will restart. I am also including Till in the thread as he may know = better.

For = consistency guarantees and concurrency control, this depends on your = underlying backend. But if you want to 
have = end-to-end control, then you could do as Ankit suggested at his point = 3), i.e have a single job for the whole pipeline
 (if this fits your needs of course). This will allow = you to set your own =E2=80=9Cprecedence=E2=80=9D rules for your = operations.

Now = finally, there is no way currently to expose the state of a job to = another job. The way to do so is either Queryable
State, or writing to a Sink. If the problem for having one = job is that you emit one element at a time, you can always = group
elements together and emit downstream less = often, in batches.
 
Finally, if  you need 2 jobs, you can always use a = hybrid solution where you keep your current state in Flink, and you dump = it 
to a Sink that is queryable once per week = for example. The Sink then can be queried at any time, and data will be = at most one 
week old.

Thanks,
Kostas

On May 17, 2017, at 9:35 AM, Fabian Hueske <fhueske@gmail.com> wrote:

Hi = Ankit, just a brief comment on the batch job is easier than streaming = job argument. I'm not sure about that.
I can see that = just the batch job might seem easier to implement, but this is only one = part of the whole story. The operational side of using batch is more = complex IMO.
You need a tool to ingest your stream, you = need storage for the ingested data, you need a periodic scheduler to = kick of your batch job, and you need to take care of failures if = something goes wrong.
The streaming case, this is not = needed or the framework does it for you.

Just my 2 cents, Fabian

2017-05-16= 20:58 GMT+02:00 Jain, Ankit <ankit.jain@here.com>:

Hi Flavio,

While you wait = on an update from Kostas, wanted to understand the use-case better and = share my thoughts-

 

1)       Why is current = batch mode expensive? Where are you persisting the data after updates? = Way I see it by moving to Flink, you get to use RocksDB(a key-value = store) that makes your lookups faster =E2=80=93 probably right now you are using a non-indexed = store like S3 maybe?

So, gain is coming from moving to a better persistence store = suited to your use-case than from batch->streaming. Myabe consider = just going with a different data store.

IMHO, stream should only be used if you really want to act on = the new events in real-time. It is generally harder to get a streaming = job correct than a batch one.

 

2)       If current setup = is expensive due to serialization-deserialization then that should be = fixed by moving to a faster format (maybe AVRO? - I don=E2=80=99t have a = lot of expertise in that). I don=E2=80=99t see how that problem will go away with Flink =E2=80=93 = so still need to handle serialization.

 

3)       Even if you do = decide to move to Flink =E2=80=93 I think you can do this with one job, = two jobs are not needed. At every incoming event, check the previous = state and update/output to kafka or whatever data store you are using.

 

 

Thanks

Ankit

 

From: Flavio Pompermaier = <pompermaier@okkam.it>
Date: Tuesday, May 16, 2017 at 9:31 AM
To: Kostas Kloudas <k.kloudas@data-artisans.com>
Cc: user <user@flink.apache.org>
Subject: Re: Stateful streaming question

 

Hi Kostas,

thanks for your quick = response. 

I also thought about using Async = IO, I just need to figure out how to correctly handle parallelism and = number of async requests. 

However that's probably the way = to go..is it possible also to set a number of retry attempts/backoff = when the async request fails (maybe due to a too busy server)?

 

For the second part I think it's = ok to persist the state into RocksDB or HDFS, my question is indeed = about that: is it safe to start reading (with another Flink job) from = RocksDB or HDFS having an updatable state "pending" on it? Should I ensure that state updates are not possible until the other Flink job = hasn't finish to read the persisted data?

 

And another question...I've tried = to draft such a processand basically I have the following code:

 

DataStream<MyGroupedObj> = groupedObj =3D tuples.keyBy(0)

        = .flatMap(new RichFlatMapFunction<Tuple4, MyGroupedObj>() {

 

        =   private transient ValueState<MyGroupedObj> state;

 

        =   @Override

        =   public void flatMap(Tuple4 t, Collector<MyGroupedObj> = out) throws Exception {

        =     MyGroupedObj current =3D state.value();

        =     if (current =3D=3D null) {

        =       current =3D new MyGroupedObj();

        =     }

        =     ....

        =    current.addTuple(t);

        =     ... 

        =     state.update(current);

        =     out.collect(current);

        =   }

        =   

        =   @Override

        =   public void open(Configuration config) {

        =     ValueStateDescriptor<MyGroupedObj> = descriptor =3D

        =               new = ValueStateDescriptor<>( "test",TypeInformation.of(MyGroupedObj.class));

        =       state =3D getRuntimeContext().getState(descriptor);

        =   }

        });

    = groupedObj.print();

 

but obviously this way I emit the = updated object on every update while, actually, I just want to persist = the ValueState somehow (and make it available to another job that runs = one/moth for example). Is that possible?

 

 

On Tue, May 16, 2017 at 5:57 PM, = Kostas Kloudas <k.kloudas@data-artisans.com> = wrote:

Hi Flavio,

 

=46rom what I understand, for the = first part you are correct. You can use Flink=E2=80=99s internal state = to keep your enriched data.

In fact, if you are also querying = an external system to enrich your data, it is worth looking at the = AsyncIO feature:

 

 

Now for the second part, = currently in Flink you cannot iterate over all registered keys for which = you have state. A pointer 

to look at the may be useful is = the queryable state:

 

 

This is still an experimental = feature, but let us know your opinion if you use it.

 

Finally, an alternative would be = to keep state in Flink, and periodically flush it to an external storage = system, which you can

query at will.

 

Thanks,

Kostas

 

 

On May 16, 2017, at 4:38 PM, = Flavio Pompermaier <pompermaier@okkam.it> wrote:

 

Hi to all,

we're still playing with Flink = streaming part in order to see whether it can improve our current batch = pipeline.

At the moment, we have a job that = translate incoming data (as Row) into Tuple4, groups them together by = the first field and persist the result to disk (using a thrift object). = When we need to add tuples to those grouped objects we need to read again the persisted data, flat it back to Tuple4, union with = the new tuples, re-group by key and finally persist.

 

This is very expansive to do with = batch computation while is should pretty straightforward to do with = streaming (from what I understood): I just need to use ListState. = Right?

Then, let's say I need to scan = all the data of the stateful computation (key and values), in order to = do some other computation, I'd like to know:

  • how to do that? I.e. create a DataSet/DataSource<Key,Value> from = the stateful data in the stream
  • is there any problem to access the stateful data without stopping = incoming data (and thus possible updates to the states)?

Thanks in advance for the = support,

Flavio

 

 



 

--

Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 1823908





= --Apple-Mail=_9899BE29-FB1D-4281-B2D6-BA304138AA51--