flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: Stateful streaming question
Date Wed, 17 May 2017 08:18:19 GMT
Hi Flavio,

For setting the retries, unfortunately there is no such setting yet and, if I am not wrong,
in case of a failure of a request, 
an exception will be thrown and the job will restart. I am also including Till in the thread
as he may know better.

For consistency guarantees and concurrency control, this depends on your underlying backend.
But if you want to 
have end-to-end control, then you could do as Ankit suggested at his point 3), i.e have a
single job for the whole pipeline
 (if this fits your needs of course). This will allow you to set your own “precedence”
rules for your operations.

Now finally, there is no way currently to expose the state of a job to another job. The way
to do so is either Queryable
State, or writing to a Sink. If the problem for having one job is that you emit one element
at a time, you can always group
elements together and emit downstream less often, in batches.
 
Finally, if  you need 2 jobs, you can always use a hybrid solution where you keep your current
state in Flink, and you dump it 
to a Sink that is queryable once per week for example. The Sink then can be queried at any
time, and data will be at most one 
week old.

Thanks,
Kostas

> On May 17, 2017, at 9:35 AM, Fabian Hueske <fhueske@gmail.com> wrote:
> 
> Hi Ankit, just a brief comment on the batch job is easier than streaming job argument.
I'm not sure about that. 
> I can see that just the batch job might seem easier to implement, but this is only one
part of the whole story. The operational side of using batch is more complex IMO. 
> You need a tool to ingest your stream, you need storage for the ingested data, you need
a periodic scheduler to kick of your batch job, and you need to take care of failures if something
goes wrong. 
> The streaming case, this is not needed or the framework does it for you.
> 
> Just my 2 cents, Fabian
> 
> 2017-05-16 20:58 GMT+02:00 Jain, Ankit <ankit.jain@here.com <mailto:ankit.jain@here.com>>:
> Hi Flavio,
> 
> While you wait on an update from Kostas, wanted to understand the use-case better and
share my thoughts-
> 
>  
> 
> 1)       Why is current batch mode expensive? Where are you persisting the data after
updates? Way I see it by moving to Flink, you get to use RocksDB(a key-value store) that makes
your lookups faster – probably right now you are using a non-indexed store like S3 maybe?
> 
> So, gain is coming from moving to a better persistence store suited to your use-case
than from batch->streaming. Myabe consider just going with a different data store.
> 
> IMHO, stream should only be used if you really want to act on the new events in real-time.
It is generally harder to get a streaming job correct than a batch one.
> 
>  
> 
> 2)       If current setup is expensive due to serialization-deserialization then that
should be fixed by moving to a faster format (maybe AVRO? - I don’t have a lot of expertise
in that). I don’t see how that problem will go away with Flink – so still need to handle
serialization.
> 
>  
> 
> 3)       Even if you do decide to move to Flink – I think you can do this with one
job, two jobs are not needed. At every incoming event, check the previous state and update/output
to kafka or whatever data store you are using.
> 
>  
> 
>  
> 
> Thanks
> 
> Ankit
> 
>  
> 
> From: Flavio Pompermaier <pompermaier@okkam.it <mailto:pompermaier@okkam.it>>
> Date: Tuesday, May 16, 2017 at 9:31 AM
> To: Kostas Kloudas <k.kloudas@data-artisans.com <mailto:k.kloudas@data-artisans.com>>
> Cc: user <user@flink.apache.org <mailto:user@flink.apache.org>>
> Subject: Re: Stateful streaming question
> 
>  
> 
> Hi Kostas,
> 
> thanks for your quick response. 
> 
> I also thought about using Async IO, I just need to figure out how to correctly handle
parallelism and number of async requests. 
> 
> However that's probably the way to go..is it possible also to set a number of retry attempts/backoff
when the async request fails (maybe due to a too busy server)?
> 
>  
> 
> For the second part I think it's ok to persist the state into RocksDB or HDFS, my question
is indeed about that: is it safe to start reading (with another Flink job) from RocksDB or
HDFS having an updatable state "pending" on it? Should I ensure that state updates are not
possible until the other Flink job hasn't finish to read the persisted data?
> 
>  
> 
> And another question...I've tried to draft such a processand basically I have the following
code:
> 
>  
> 
> DataStream<MyGroupedObj> groupedObj = tuples.keyBy(0)
> 
>         .flatMap(new RichFlatMapFunction<Tuple4, MyGroupedObj>() {
> 
>  
> 
>           private transient ValueState<MyGroupedObj> state;
> 
>  
> 
>           @Override
> 
>           public void flatMap(Tuple4 t, Collector<MyGroupedObj> out) throws Exception
{
> 
>             MyGroupedObj current = state.value();
> 
>             if (current == null) {
> 
>               current = new MyGroupedObj();
> 
>             }
> 
>             ....
> 
>            current.addTuple(t);
> 
>             ... 
> 
>             state.update(current);
> 
>             out.collect(current);
> 
>           }
> 
>           
> 
>           @Override
> 
>           public void open(Configuration config) {
> 
>             ValueStateDescriptor<MyGroupedObj> descriptor =
> 
>                       new ValueStateDescriptor<>( "test",TypeInformation.of(MyGroupedObj.class));
> 
>               state = getRuntimeContext().getState(descriptor);
> 
>           }
> 
>         });
> 
>     groupedObj.print();
> 
>  
> 
> but obviously this way I emit the updated object on every update while, actually, I just
want to persist the ValueState somehow (and make it available to another job that runs one/moth
for example). Is that possible?
> 
>  
> 
>  
> 
> On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas <k.kloudas@data-artisans.com <mailto:k.kloudas@data-artisans.com>>
wrote:
> 
> Hi Flavio,
> 
>  
> 
> From what I understand, for the first part you are correct. You can use Flink’s internal
state to keep your enriched data.
> 
> In fact, if you are also querying an external system to enrich your data, it is worth
looking at the AsyncIO feature:
> 
>  
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html>
>  
> 
> Now for the second part, currently in Flink you cannot iterate over all registered keys
for which you have state. A pointer 
> 
> to look at the may be useful is the queryable state:
> 
>  
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/queryable_state.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/queryable_state.html>
>  
> 
> This is still an experimental feature, but let us know your opinion if you use it.
> 
>  
> 
> Finally, an alternative would be to keep state in Flink, and periodically flush it to
an external storage system, which you can
> 
> query at will.
> 
>  
> 
> Thanks,
> 
> Kostas
> 
>  
> 
>  
> 
> On May 16, 2017, at 4:38 PM, Flavio Pompermaier <pompermaier@okkam.it <mailto:pompermaier@okkam.it>>
wrote:
> 
>  
> 
> Hi to all,
> 
> we're still playing with Flink streaming part in order to see whether it can improve
our current batch pipeline.
> 
> At the moment, we have a job that translate incoming data (as Row) into Tuple4, groups
them together by the first field and persist the result to disk (using a thrift object). When
we need to add tuples to those grouped objects we need to read again the persisted data, flat
it back to Tuple4, union with the new tuples, re-group by key and finally persist.
> 
>  
> 
> This is very expansive to do with batch computation while is should pretty straightforward
to do with streaming (from what I understood): I just need to use ListState. Right?
> 
> Then, let's say I need to scan all the data of the stateful computation (key and values),
in order to do some other computation, I'd like to know:
> 
> how to do that? I.e. create a DataSet/DataSource<Key,Value> from the stateful data
in the stream
> is there any problem to access the stateful data without stopping incoming data (and
thus possible updates to the states)?
> Thanks in advance for the support,
> 
> Flavio
> 
>  
> 
>  
> 
> 
> 
> 
>  
> 
> --
> 
> Flavio Pompermaier
> Development Department
> 
> OKKAM S.r.l.
> Tel. +(39) 0461 1823908 <tel:+39%200461%20182%203908>


Mime
View raw message