flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Simple stateful polling source
Date Wed, 19 Jun 2019 10:40:03 GMT
This looks fine to me.

What exactly were you worried about?

On 19/06/2019 12:33, Flavio Pompermaier wrote:
> Hi to all,
> in my use case I have to ingest data from a rest service, where I 
> periodically poll the data (of course a queue would be a better choice 
> but this doesn't depend on me).
>
> So I wrote a RichSourceFunction that starts a thread that poll for new 
> data.
> However, I'd like to restart from the last "from" value (in the case 
> the job is stopped).
>
> My initial thought was to write somewhere the last used date and, on 
> job restart, read that date (from a file for example). However, Flink 
> stateful source should be a better choice here...am I wrong? So I 
> made  my source function implementing ListCheckpointed<String>:
>
> @Override
> public List<String> snapshotState(long checkpointId, long timestamp) 
> throws Exception {
>    return Collections.singletonList(pollingThread.getDateFromAsString());
> }
> @Override
> public void restoreState(List<String> state) throws Exception {
>     for (String dateFrom : state) {
>          startDateStr = dateFrom;
>      }
> }
>
> @Override
> public void run(SourceContext<MyEvent> ctx) throws Exception {
>        final Object lock = ctx.getCheckpointLock();
>        Client httpClient = getHttpClient();
>        try {
>               pollingThread = new MyPollingThread.Builder(baseUrl, 
> httpClient)//
>               .setStartDate(startDateStr, datePatternStr)//
>               .build();
>               // start the polling thread
>               new Thread(pr).start();
>         .... (etc)
> }
>
> Is this the correct approach or did I misunderstood how stateful 
> source functions work?
>
> Best,
> Flavio



Mime
View raw message