flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vipul singh <neoea...@gmail.com>
Subject Re: Issues in recovering state from last crash using custom sink
Date Mon, 28 Aug 2017 18:14:09 GMT
Hi Aljoscha,

I am running the application till a few checkpoints are complete. I am
stopping the application between two checkpoints, so there will be messages
in the list state, which should be checkpointed when *snapshot* is called.
I am able to see a checkpoint file on S3( I am saving the checkpoints on s3
using rockstatedb). On restarting the application, I add a debug point here
to see if there are any messages in checkpointedMessages, but as shown
below, the list is empty.

Do you think there might be an error in the way I am trying to retrieve

def snapshotState(context: FunctionSnapshotContext) {
pendingFiles synchronized {
if (pendingFiles.nonEmpty) {
// we have a list of pending files
// we move all times to S3( thats the sink in our case)
// and post that we delete these files

* def initializeState(context: FunctionInitializationContext) {*
* // Check is files alreay exist in /tmp*
* // this might be the case the program crashed before these files were
uploaded to s3*
* // We need to recover(upload these files to S3 and clear the directory*
* handlePreviousPendingFiles()*
* checkpointedMessages = context.getOperatorStateStore.getListState(new
ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new
TypeHint[Message]() {})))*
* import scala.collection.JavaConversions._*
* for (message <- checkpointedMessages.get) {*
* bufferredMessages.add(message)*
* }*
* }* From my understanding in the above code, upon checkpointing, messages
contained in checkpointedMessages are in the snapshot, and on
*initializeState* being called, it will try to recover these messages from
last checkpoint?
Do you think the error is in the way I am trying to get the last checkpoint
ListBuffer elements?
checkpointedMessages = context.getOperatorStateStore.getListState(new
ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new
TypeHint[Message]() {})))

Please let me know!


On Mon, Aug 28, 2017 at 2:52 AM, Aljoscha Krettek <aljoscha@apache.org>

> Hi,
> How are you testing the recovery behaviour? Are you taking a savepoint
> ,then shutting down, and then restarting the Job from the savepoint?
> Best,
> Aljoscha
> On 28. Aug 2017, at 00:28, vipul singh <neoeahit@gmail.com> wrote:
> Hi all,
> I am working on a flink archiver application. In a gist this application
> tries to reads a bunch of schematized messages from kafka and archives them
> to s3. Due to the nature of the naming of the files, I had to go towards a
> custom sink implementation. As of the current progress the application in
> general is able to archive files to s3 ok.
> I am having some issues during the recovery phase. A sample of the code
> can be found on link
> <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe>. My
> issue is on recovery when initializeState is called, it is not able to
> get(recover) the last checkpointed ListState( i.e. checkpointedMessages
> <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe#file-gistfile1-txt-L47>
> 0). I think this might be because of the way I am retrieving the
> checkpointed messages. Could someone please point me to what is wrong? or
> direct me to some examples which do a similar thing( Please note Message
> <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe#file-gistfile1-txt-L2>
> is our own implementation)
> Thanks,
> Vipul


View raw message