apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pramod Immaneni <pra...@datatorrent.com>
Subject Re: Kinesis Operator Help
Date Wed, 17 Feb 2016 18:50:06 GMT
Jim,

Is there state in the rest of the operators in your application? If that
cases storing the offsets only the input operator(s) is not sufficient as
you will lose the state from the other operators in your DAG. Second
modifying idempotent storage manager to store in an external location,
while can be done, will not always result in correct behavior. This is
because the state of the input operator is saved using idempotent storage
manager as soon as the end of a window is reached for the input operator
but the other operators downstream in the DAG may not yet have seen or
processed the data. So if you restart from these offsets directly that can
result in loss of data.

Launching of the application by passing the previous application id (the
-originalAppId flag) handles this correctly by using the saved state of all
the operators and starting the input operator at a window that does not
cause any loss of data. There is a limitation to this. It is that when you
relaunch the application the application code shouldn't have changed in a
way that the operator is no longer deserializable from the previous state.
If that is not a problem for you, for example if you are not changing code
between reruns then this approach is the best.

If you cannot launch from previous application state described above for
reasons such as your application code is going to change significantly and
if you dont have state in the rest of the application you can save the
offsets external and load them back at startup. In that case you want to
store the offsets from committed window which are guaranteed to have been
processed by all operators in the downstream of the DAG. This is a more
complicated approach as the operator has to keep track of committed offsets
and save them. I would recommend to go with the launch using previous
application id if possible.

Thanks

On Tue, Feb 16, 2016 at 11:40 AM, Jim <jim@facility.supplies> wrote:

> Good morning,
>
>
>
> I am new to Apex, Hadoop and Yarn (nothing like tackling something new, is
> there?).
>
>
>
> I have my first Apex apps working that are edi processors that read new
> edi transactions from an Amazon Kinesis stream, look at the data, and
> routes the edi data to an appropriate handler for processing (note the
> operatorEs pushes the data to ElasticSearch for logging).  Here is a
> diagram:
>
>
>
>
>
> Everything launches, and is working fine with the above diagram from the
> edi router through the transaction operators.
>
>
>
> The final challenge I am having, being new to all of this, is that the
> Kinesis operator, by default, stores it’s app id in into
> IdempotentStorageManager (aka WindowDataManager) when it is launched, so if
> the app it shutdown and restarted this same app id is used by default with
> the checkpoint so you don’t reprocess the same records again when the
> application is restarted.
>
>
>
> You can see this id immediately to the right of the Operations / apps in
> gray lettering ‘application_1453741656046_0520’ in the image from the
> datatorrent console below:
>
>
>
> [image: cid:image004.png@01D168BA.5FE56550]
>
>
>
> However, if you kill the application, and re-launch, this id changes, and
> it starts reading from the Kinesis stream back from the beginning; and the
> only way to restart it so it starts where it left off is using the cli as
> follows:
>
>
>
> 1.)    Run ‘dtcli’ from the command line.
>
> 2.)    Run ‘launch -originalAppId “application_1453741656046_0520” <path
> to .apa file>’
>
>
>
> This will launch the application using the same app id identified in the
> console screen above.
>
>
>
> I want to make this easier, but need some experts help in tweaking this so
> it works.
>
>
>
> I am thinking that there should be a way with Kinesis to:
>
>
>
> 1.)    Define in the properties, a Kinesis app id string value.
>
> 2.)    If this value is defined, it will use that, when launching the
> application, to check if an Hadoop app id has already been assigned to that
> identifier.
>
> 3.)    If that value is not yet stored in the database, it will launch
> the app, creating a new app id, and store the app id under the identifier
> key value.
>
> 4.)    Now if I kill the app, or install new software, it will always
> pick up where it left off by using the identifier key value to retrieve and
> assign the app id.
>
>
>
> Sounds simple, right?  J
>
>
>
> Can one of the experts out there help me figure this out as I don’t want
> to reprocess already processed edi transactions?
>
>
>
> Thanks,
>
>
>
> Jim
>
>
>
>
>
>
>
>
>
> Jim
>
>
> jim@facility.supplies (414) 760-7711
> ------------------------------
> The information contained in this communication, including any files or
> attachments transmitted with it, may contain copyrighted information or
> information that is confidential and exempt from disclosure under
> applicable laws and regulations, is intended only for the use of the
> recipient(s) named above, and may be legally privileged. If the reader of
> this message is not the intended recipient, you are hereby notified that
> any dissemination, distribution, or copying of this communication, or any
> of its contents, files or attachments, is strictly prohibited. If you have
> received this communication in error, please return it to the sender
> immediately and delete the original message and any copy of it from your
> computer system. If you have any questions concerning this message, please
> contact the sender.
>

Mime
View raw message