apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chandni Singh <chan...@datatorrent.com>
Subject Re: Operator checkpointing in distributed in-memory store
Date Fri, 04 Dec 2015 17:57:27 GMT
Ashish,

This sounds very interesting and useful. Looking forward to the
implementation.

Chandni

On Fri, Dec 4, 2015 at 9:49 AM, Ashish Tadose <ashishtadose@gmail.com>
wrote:

> Gaurav, Sandesh
>
> PFB my comments in *bold*
>
> 1. Are there standard APIs for distributed In-Memory stores or is this
> implementation specific to one particular tool?
> *I have developed concrete implementation with Apache Geode -
> http://geode.incubator.apache.org/ <http://geode.incubator.apache.org/>*
> *However for this feature contribution I am adding KeyValue store interface
> and abstract implementation to plug in any KeyValue store as storage
> agent. *
>
> 2. Will In-Memory Store compete with DataTorrent Apps for cluster resources
> (memory/cpu)?
> *Probable not, In-memory store would be separate managed cluster which may
> not part of yarn env. *
>
> 3. What is the purging policy? Who is responsible for cleaning up the
> resources for completed/failed/aborted applications? This becomes important
> when you want to launch an Application using previous Application Id
> *In-memory storage would support delete checkpoint which platform calls
> periodically d**uring application lifetime. *
> *Purging the checkpoints of older applications will be taken care by
> application developer or admin who is managing the in-memory cluster, same
> is the case with HDFS storage agents where user have to manually delete old
> apps and checkpoints data.*
>
> 4 What all in-memory store did you evaluate?  Are they YARN compatible?
> *I have concrete implementation of Geode storage agent which I would be
> contributing along with this feature.*
>
> Thanks,
> Ashish
>
>
> On Fri, Dec 4, 2015 at 12:45 AM, Sandesh Hegde <sandesh@datatorrent.com>
> wrote:
>
> > Ashish,
> >
> > Two more questions for you,
> > What all in-memory store did you evaluate?  Are they YARN compatible?
> >
> > Thank you for your contribution.
> >
> > Sandesh
> >
> > On Wed, Dec 2, 2015 at 10:53 AM Gaurav Gupta <gaurav@datatorrent.com>
> > wrote:
> >
> > > Ashish,
> > >
> > > I have couple of questions
> > > 1. Are there standard APIs for distributed In-Memory stores or is this
> > > implementation specific to one particular tool?
> > > 2. Will In-Memory Store compete with DataTorrent Apps for cluster
> > > resources (memory/cpu)?
> > > 3. What is the purging policy? Who is responsible for cleaning up the
> > > resources for completed/failed/aborted applications? This becomes
> > important
> > > when you want to launch an Application using previous Application Id
> > >
> > > Thanks
> > > - Gaurav
> > >
> > > > On Dec 2, 2015, at 10:07 AM, Ashish Tadose <ashishtadose@gmail.com>
> > > wrote:
> > > >
> > > > Thanks Gaurav,
> > > >
> > > > I have finished baseline implementations of StorageAgent and also
> > tested
> > > it
> > > > with demo applications by explicitly specifying it in DAG
> configuration
> > > as
> > > > below and it works fine.
> > > >
> > > > dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
> > > >
> > > > I also had to make some changes to StramClient to pass additional
> > > > information such as applicationId as it doesn't passes currently.
> > > >
> > > > I am going to create JIRA task for this feature and will document
> > design
> > > &
> > > > implementation strategy there.
> > > >
> > > > Thx,
> > > > Asish
> > > >
> > > >
> > > > On Wed, Dec 2, 2015 at 11:26 PM, Gaurav Gupta <
> gaurav@datatorrent.com>
> > > > wrote:
> > > >
> > > >> Just to add you can plugin your storage agent using attribute
> > > >> STORAGE_AGENT (
> > > >>
> > >
> >
> https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Context.OperatorContext.html#STORAGE_AGENT
> > > >> )
> > > >>
> > > >> Thanks
> > > >> - Gaurav
> > > >>
> > > >>> On Dec 2, 2015, at 9:51 AM, Gaurav Gupta <gaurav@datatorrent.com>
> > > wrote:
> > > >>>
> > > >>> Ashish,
> > > >>>
> > > >>> You are right that Exactly once semantics can’t be achieved
through
> > > >> Async FS write.
> > > >>> Did you try new StorageAgent with your Application? If yes do
you
> > have
> > > >> any numbers to compare?
> > > >>>
> > > >>> Thanks
> > > >>> - Gaurav
> > > >>>
> > > >>>> On Dec 2, 2015, at 9:33 AM, Ashish Tadose <ashishtadose@gmail.com
> > > >> <mailto:ashishtadose@gmail.com>> wrote:
> > > >>>>
> > > >>>> Application uses large number of in-memory dimension store
> > partitions
> > > to
> > > >>>> hold high cardinally aggregated data and also many intermediate
> > > >> operators
> > > >>>> keep cache data for reference look ups which are not-transient.
> > > >>>>
> > > >>>> Total application partitions were more than 1000 which makes
lot
> of
> > > >>>> operator to checkpoint and in term lot of frequent Hdfs write,
> > rename
> > > &
> > > >>>> delete operations which became bottleneck.
> > > >>>>
> > > >>>> Application requires Exactly once semantics with idempotent
> > operators
> > > >> which
> > > >>>> I suppose can not be achieved through Async fs writes, please
> > correct
> > > >> me If
> > > >>>> I'm wrong here.
> > > >>>>
> > > >>>> Also application computes streaming aggregations of high
> cardinality
> > > >>>> incoming data streams and reference caches are update frequently
> so
> > > not
> > > >>>> sure how much incremental checkpointing will help here.
> > > >>>>
> > > >>>> Despite this specific application I strongly think it would
be
> good
> > to
> > > >> have
> > > >>>> StorageAgent backed by distributed in-memory store as alternative
> in
> > > >>>> platform.
> > > >>>>
> > > >>>> Ashish
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> On Wed, Dec 2, 2015 at 10:35 PM, Munagala Ramanath <
> > > ram@datatorrent.com
> > > >> <mailto:ram@datatorrent.com>>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Ashish,
> > > >>>>>
> > > >>>>> In the current release, the HDFS writes are asynchronous
so I'm
> > > >> wondering
> > > >>>>> if
> > > >>>>> you could elaborate on how much latency you are observing
both
> with
> > > and
> > > >>>>> without
> > > >>>>> checkpointing (i.e. after your changes to make operators
> > stateless).
> > > >>>>>
> > > >>>>> Also any information on how much non-transient data is
being
> > > >> checkpointed
> > > >>>>> in
> > > >>>>> each operator would also be useful. There is an effort
under way
> to
> > > >>>>> implement
> > > >>>>> incremental checkpointing which should improve things
when there
> > is a
> > > >> lot
> > > >>>>> state
> > > >>>>> but very little that changes from window to window.
> > > >>>>>
> > > >>>>> Ram
> > > >>>>>
> > > >>>>>
> > > >>>>> On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose <
> > > ashishtadose@gmail.com
> > > >> <mailto:ashishtadose@gmail.com>>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> Hi All,
> > > >>>>>>
> > > >>>>>> Currently Apex engine provides operator checkpointing
in Hdfs (
> > with
> > > >> Hdfs
> > > >>>>>> backed StorageAgents i.e. FSStorageAgent & AsyncFSStorageAgent
)
> > > >>>>>>
> > > >>>>>> We have observed that for applications having large
number of
> > > operator
> > > >>>>>> instances, hdfs checkpointing introduces latency in
DAG which
> > > degrades
> > > >>>>>> overall application performance.
> > > >>>>>> To resolve this we had to review all operators in
DAG and had to
> > > make
> > > >> few
> > > >>>>>> operators stateless.
> > > >>>>>>
> > > >>>>>> As operator check-pointing is critical functionality
of Apex
> > > streaming
> > > >>>>>> platform to ensure fault tolerant behavior, platform
should also
> > > >> provide
> > > >>>>>> alternate StorageAgents which will work seamlessly
with large
> > > >>>>> applications
> > > >>>>>> that requires Exactly once semantics.
> > > >>>>>>
> > > >>>>>> HDFS read/write latency is limited and doesn't improve
beyond
> > > certain
> > > >>>>> point
> > > >>>>>> because of disk io & staging writes. Having alternate
strategy
> to
> > > this
> > > >>>>>> check-pointing in fault tolerant distributed in-memory
grid
> would
> > > >> ensure
> > > >>>>>> application stability and performance is not impacted.
> > > >>>>>>
> > > >>>>>> I have developed a in-memory storage agent which I
would like to
> > > >>>>> contribute
> > > >>>>>> as alternate StorageAgent for checkpointing.
> > > >>>>>>
> > > >>>>>> Thanks,
> > > >>>>>> Ashish
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > > >>
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message