flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Simple stateful polling source
Date Wed, 19 Jun 2019 10:33:16 GMT
Hi to all,
in my use case I have to ingest data from a rest service, where I
periodically poll the data (of course a queue would be a better choice but
this doesn't depend on me).

So I wrote a RichSourceFunction that starts a thread that poll for new data.
However, I'd like to restart from the last "from" value (in the case the
job is stopped).

My initial thought was to write somewhere the last used date and, on job
restart, read that date (from a file for example). However, Flink stateful
source should be a better choice here...am I wrong? So I made  my source
function implementing ListCheckpointed<String>:

@Override
public List<String> snapshotState(long checkpointId, long timestamp) throws
Exception {
   return Collections.singletonList(pollingThread.getDateFromAsString());
}
@Override
public void restoreState(List<String> state) throws Exception {
    for (String dateFrom : state) {
         startDateStr = dateFrom;
     }
}

@Override
public void run(SourceContext<MyEvent> ctx) throws Exception {
       final Object lock = ctx.getCheckpointLock();
       Client httpClient = getHttpClient();
       try {
              pollingThread = new MyPollingThread.Builder(baseUrl,
httpClient)//
              .setStartDate(startDateStr, datePatternStr)//
              .build();
              // start the polling thread
              new Thread(pr).start();
        .... (etc)
}

Is this the correct approach or did I misunderstood how stateful source
functions work?

Best,
Flavio

Mime
View raw message