flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vipul singh <neoea...@gmail.com>
Subject Re: Issues in recovering state from last crash using custom sink
Date Mon, 28 Aug 2017 18:14:09 GMT
Hi Aljoscha,

Yes.
I am running the application till a few checkpoints are complete. I am
stopping the application between two checkpoints, so there will be messages
in the list state, which should be checkpointed when *snapshot* is called.
I am able to see a checkpoint file on S3( I am saving the checkpoints on s3
using rockstatedb). On restarting the application, I add a debug point here
<https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe#file-gistfile1-txt-L49>,
to see if there are any messages in checkpointedMessages, but as shown
below, the list is empty.


‚Äč
Do you think there might be an error in the way I am trying to retrieve
messages?


def snapshotState(context: FunctionSnapshotContext) {
checkpointedMessages.clear()
bufferredMessages.foreach(checkpointedMessages.add)
pendingFiles synchronized {
if (pendingFiles.nonEmpty) {
// we have a list of pending files
// we move all times to S3( thats the sink in our case)
// and post that we delete these files
}
pendingFiles.clear()
}
}

* def initializeState(context: FunctionInitializationContext) {*
* // Check is files alreay exist in /tmp*
* // this might be the case the program crashed before these files were
uploaded to s3*
* // We need to recover(upload these files to S3 and clear the directory*
* handlePreviousPendingFiles()*
* checkpointedMessages = context.getOperatorStateStore.getListState(new
ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new
TypeHint[Message]() {})))*
* import scala.collection.JavaConversions._*
* for (message <- checkpointedMessages.get) {*
* bufferredMessages.add(message)*
* }*
* }* From my understanding in the above code, upon checkpointing, messages
contained in checkpointedMessages are in the snapshot, and on
*initializeState* being called, it will try to recover these messages from
last checkpoint?
Do you think the error is in the way I am trying to get the last checkpoint
ListBuffer elements?
checkpointedMessages = context.getOperatorStateStore.getListState(new
ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new
TypeHint[Message]() {})))

Please let me know!

Thanks,
Vipul

On Mon, Aug 28, 2017 at 2:52 AM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Hi,
>
> How are you testing the recovery behaviour? Are you taking a savepoint
> ,then shutting down, and then restarting the Job from the savepoint?
>
> Best,
> Aljoscha
>
> On 28. Aug 2017, at 00:28, vipul singh <neoeahit@gmail.com> wrote:
>
> Hi all,
>
> I am working on a flink archiver application. In a gist this application
> tries to reads a bunch of schematized messages from kafka and archives them
> to s3. Due to the nature of the naming of the files, I had to go towards a
> custom sink implementation. As of the current progress the application in
> general is able to archive files to s3 ok.
> I am having some issues during the recovery phase. A sample of the code
> can be found on link
> <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe>. My
> issue is on recovery when initializeState is called, it is not able to
> get(recover) the last checkpointed ListState( i.e. checkpointedMessages
> <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe#file-gistfile1-txt-L47>
is
> 0). I think this might be because of the way I am retrieving the
> checkpointed messages. Could someone please point me to what is wrong? or
> direct me to some examples which do a similar thing( Please note Message
> <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe#file-gistfile1-txt-L2>
class
> is our own implementation)
>
> Thanks,
> Vipul
>
>
>


-- 
Thanks,
Vipul

Mime
View raw message