apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gaurav Gupta <gau...@datatorrent.com>
Subject Re: Operator checkpointing in distributed in-memory store
Date Wed, 02 Dec 2015 18:52:57 GMT
Ashish,

I have couple of questions
1. Are there standard APIs for distributed In-Memory stores or is this implementation specific
to one particular tool?
2. Will In-Memory Store compete with DataTorrent Apps for cluster resources (memory/cpu)?
3. What is the purging policy? Who is responsible for cleaning up the resources for completed/failed/aborted
applications? This becomes important when you want to launch an Application using previous
Application Id

Thanks
- Gaurav

> On Dec 2, 2015, at 10:07 AM, Ashish Tadose <ashishtadose@gmail.com> wrote:
> 
> Thanks Gaurav,
> 
> I have finished baseline implementations of StorageAgent and also tested it
> with demo applications by explicitly specifying it in DAG configuration as
> below and it works fine.
> 
> dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
> 
> I also had to make some changes to StramClient to pass additional
> information such as applicationId as it doesn't passes currently.
> 
> I am going to create JIRA task for this feature and will document design &
> implementation strategy there.
> 
> Thx,
> Asish
> 
> 
> On Wed, Dec 2, 2015 at 11:26 PM, Gaurav Gupta <gaurav@datatorrent.com>
> wrote:
> 
>> Just to add you can plugin your storage agent using attribute
>> STORAGE_AGENT (
>> https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Context.OperatorContext.html#STORAGE_AGENT
>> )
>> 
>> Thanks
>> - Gaurav
>> 
>>> On Dec 2, 2015, at 9:51 AM, Gaurav Gupta <gaurav@datatorrent.com> wrote:
>>> 
>>> Ashish,
>>> 
>>> You are right that Exactly once semantics can’t be achieved through
>> Async FS write.
>>> Did you try new StorageAgent with your Application? If yes do you have
>> any numbers to compare?
>>> 
>>> Thanks
>>> - Gaurav
>>> 
>>>> On Dec 2, 2015, at 9:33 AM, Ashish Tadose <ashishtadose@gmail.com
>> <mailto:ashishtadose@gmail.com>> wrote:
>>>> 
>>>> Application uses large number of in-memory dimension store partitions to
>>>> hold high cardinally aggregated data and also many intermediate
>> operators
>>>> keep cache data for reference look ups which are not-transient.
>>>> 
>>>> Total application partitions were more than 1000 which makes lot of
>>>> operator to checkpoint and in term lot of frequent Hdfs write, rename &
>>>> delete operations which became bottleneck.
>>>> 
>>>> Application requires Exactly once semantics with idempotent operators
>> which
>>>> I suppose can not be achieved through Async fs writes, please correct
>> me If
>>>> I'm wrong here.
>>>> 
>>>> Also application computes streaming aggregations of high cardinality
>>>> incoming data streams and reference caches are update frequently so not
>>>> sure how much incremental checkpointing will help here.
>>>> 
>>>> Despite this specific application I strongly think it would be good to
>> have
>>>> StorageAgent backed by distributed in-memory store as alternative in
>>>> platform.
>>>> 
>>>> Ashish
>>>> 
>>>> 
>>>> 
>>>> On Wed, Dec 2, 2015 at 10:35 PM, Munagala Ramanath <ram@datatorrent.com
>> <mailto:ram@datatorrent.com>>
>>>> wrote:
>>>> 
>>>>> Ashish,
>>>>> 
>>>>> In the current release, the HDFS writes are asynchronous so I'm
>> wondering
>>>>> if
>>>>> you could elaborate on how much latency you are observing both with and
>>>>> without
>>>>> checkpointing (i.e. after your changes to make operators stateless).
>>>>> 
>>>>> Also any information on how much non-transient data is being
>> checkpointed
>>>>> in
>>>>> each operator would also be useful. There is an effort under way to
>>>>> implement
>>>>> incremental checkpointing which should improve things when there is a
>> lot
>>>>> state
>>>>> but very little that changes from window to window.
>>>>> 
>>>>> Ram
>>>>> 
>>>>> 
>>>>> On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose <ashishtadose@gmail.com
>> <mailto:ashishtadose@gmail.com>>
>>>>> wrote:
>>>>> 
>>>>>> Hi All,
>>>>>> 
>>>>>> Currently Apex engine provides operator checkpointing in Hdfs ( with
>> Hdfs
>>>>>> backed StorageAgents i.e. FSStorageAgent & AsyncFSStorageAgent
)
>>>>>> 
>>>>>> We have observed that for applications having large number of operator
>>>>>> instances, hdfs checkpointing introduces latency in DAG which degrades
>>>>>> overall application performance.
>>>>>> To resolve this we had to review all operators in DAG and had to
make
>> few
>>>>>> operators stateless.
>>>>>> 
>>>>>> As operator check-pointing is critical functionality of Apex streaming
>>>>>> platform to ensure fault tolerant behavior, platform should also
>> provide
>>>>>> alternate StorageAgents which will work seamlessly with large
>>>>> applications
>>>>>> that requires Exactly once semantics.
>>>>>> 
>>>>>> HDFS read/write latency is limited and doesn't improve beyond certain
>>>>> point
>>>>>> because of disk io & staging writes. Having alternate strategy
to this
>>>>>> check-pointing in fault tolerant distributed in-memory grid would
>> ensure
>>>>>> application stability and performance is not impacted.
>>>>>> 
>>>>>> I have developed a in-memory storage agent which I would like to
>>>>> contribute
>>>>>> as alternate StorageAgent for checkpointing.
>>>>>> 
>>>>>> Thanks,
>>>>>> Ashish
>>>>>> 
>>>>> 
>>> 
>> 
>> 


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message