apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chaitanya Chebolu <chaita...@datatorrent.com>
Subject Re: Inner Join Operator Using Managed State
Date Tue, 24 May 2016 10:46:18 GMT
Thanks Chandni and Tim for the valuable inputs.
I will use SpillableComplexComponent as common abstraction.

Created a JIRA for this task:
https://issues.apache.org/jira/browse/APEXMALHAR-2100.

Regards,
Chaitanya

On Fri, May 20, 2016 at 11:39 AM, Timothy Farkas <
timothytiborfarkas@gmail.com> wrote:

> Hi Chandni,
>
> That is correct. I will provide some explanation about the motivation and
> usage of that interface:
>
> Goals:
>
> The goal of SpillableComplexComponent is to provide an interface for
> creating Spillable datastructures. It is essentially the interface for a
> factory class which produces Spillable datastructures. By having a factory
> interface it allows different backends to be plugged into operators very
> easily.
>
> Going into more detail for your use case SpillableComplexComponent has two
> factory methods newSpillableByteMap and newSpillableByteArrayListMultimap.
> These two methods are factory methods which return an implementation of the
> SpillabeByteMap and SpillableArrayListMultimap interfaces.
>
> Usage:
>
> Setting backend on an operator:
>
> myOperator.setStore(new InMemorySpillableComplexComponent())
> //myOperator.setStore(new ManagedStateSpillableComplexComponent())
> //myOperator.setStore(new HbaseSpillableComplexComponent())
>
> Using the factory in an operator
>
> setup() {
>    map = store.newSpillableByteMap()
> }
>
> As you can see you can set the factory on an operator, then the operator
> can use the factory to create a Spillable datastructure. The operator is
> agnostic to the store which manages the data for Spillable datastructures.
> If you want the data to be stored in managed state simply set a
> ManagedState implementation of SpillableComplexComponent, if you want the
> data to be stored in Cassandra simply set a Cassandra implementation of
> SpillableComplexComponent on the operator. The code using the spillable
> datastructures is independent of the backend used to store the data with
> this design
>
> Thanks,
> Tim
>
> On Thu, May 19, 2016 at 9:29 PM, Chandni Singh <singh.chandni@gmail.com>
> wrote:
>
> > Hey Tim,
> >
> > As I understand for Join operator, there needs to be a common abstract
> for
> > a SpillableMap and SpillableArrayListMultipmap.
> >
> > I suggested using SpillableComplexComponent. Is this correct?
> >
> > Thanks,
> > Chandni
> >
> > On Wed, May 18, 2016 at 1:34 AM, Chandni Singh <singh.chandni@gmail.com>
> > wrote:
> >
> > > Hi Chaitanya,
> > >
> > > I am NOT suggesting that you use the interface TimeSliceBucketedState.
> > >
> > > I don't see the need of having a JoinStore abstraction.
> > >
> > > There will be SpillableArrayListMultimap implementation on which you
> can
> > > set "ManagedTimeUnifiedStateImpl" as the persistent store.
> > > This API of SpillableArrayListMultimap is sufficient for the use case.
> > >
> > > You can directly use this implementation of SpillableArrayListMultimap
> in
> > > the Join operator.  Here is a simple example:
> > >
> > > class InnerJoinOperator
> > > {
> > >    SpillableArrayListMultiMap stream1Data = new
> > > SpillableArrayListMultiMap(ManagedTimeUnifiedStateImp);
> > >
> > >    port1.process (tuple) {
> > >        stream1Data.put(tuple.getKey(), tuple.getVal());
> > >    }
> > > }
> > >
> > >
> > > Chandni
> > >
> > >
> > >
> > >
> > > On Wed, May 18, 2016 at 1:00 AM, Chaitanya Chebolu <
> > > chaitanya@datatorrent.com> wrote:
> > >
> > >> Hi Chandni,
> > >>
> > >>    I think you are suggesting about interface
> > "TimeSlicedBucketedState". I
> > >> feel this is tightly coupled with the Managed State.
> > >>    In "TimeSlicedBucketedState" abstraction, bucketId parameter
> relates
> > to
> > >> the Managed State and this is not needed for join operator.
> > >>
> > >> Regards,
> > >> Chaitanya
> > >>
> > >> On Wed, May 18, 2016 at 12:51 PM, Chandni Singh <
> > singh.chandni@gmail.com>
> > >> wrote:
> > >>
> > >> > Chaitanya,
> > >> >
> > >> > SpillableArrayListMultimap will provide gives you similar
> abstraction.
> > >> >
> > >> > Why do we need  another abstraction "Join Store" ?
> > >> >
> > >> > Chandni
> > >> >
> > >> > On Tue, May 17, 2016 at 11:30 PM, Chaitanya Chebolu <
> > >> > chaitanya@datatorrent.com> wrote:
> > >> >
> > >> > > Chandni,
> > >> > >
> > >> > >    JoinStore is an interface and consists of following methods:
> > >> > > 1) boolean put(Object key, long time, Object value) => Insert
> (Key,
> > >> > Value)
> > >> > > pair into the store.
> > >> > > 2) List<> getTuples(Object key) => Return the list of
values from
> > >> store
> > >> > to
> > >> > > which the specified key is mapped.
> > >> > >
> > >> > >    JoinStore is a plug-able to the join operator. Below properties
> > >> > exposed
> > >> > > from join operator:
> > >> > > JoinStore leftStore, rightStore.
> > >> > >
> > >> > > By default, leftStore & rightStore would be the Join Store
using
> > >> > spillable
> > >> > > data-structures over ManagedState. If the user wants to integrate
> > >> > different
> > >> > > store, he/she has to implements the JoinStore and set it to the
> Join
> > >> > > operator.
> > >> > >
> > >> > >
> > >> > > Tim,
> > >> > >
> > >> > >    I am not planning any different implementation.
> > >> > >    Here, I proposed to plug the
> > >> SpillableArrayListMultiMap/SplillableMap
> > >> > > over managed state. I think this is what you are developing.
> Please
> > >> > correct
> > >> > > it, if I am wrong.
> > >> > >
> > >> > > Regards,
> > >> > > Chaitanya
> > >> > >
> > >> > > On Tue, May 17, 2016 at 11:24 PM, Timothy Farkas <
> > >> > > timothytiborfarkas@gmail.com> wrote:
> > >> > >
> > >> > > > Chaitanya,
> > >> > > >
> > >> > > > Are you planning to use the SpillableMap and
> > >> SpillableArrayListMultiMap
> > >> > > > that are in development, or separate implementations? If
you are
> > >> > planning
> > >> > > > on using separate implementations can you please explain
why
> they
> > >> are
> > >> > > > needed?
> > >> > > >
> > >> > > > Thanks,
> > >> > > > Tim
> > >> > > >
> > >> > > > On Tue, May 17, 2016 at 10:36 AM, Chaitanya Chebolu <
> > >> > > > chaitanya@datatorrent.com> wrote:
> > >> > > >
> > >> > > > > Hi Chandni,
> > >> > > > >
> > >> > > > >   I am suggesting two stores and categorized those
based on
> key
> > >> type:
> > >> > > > > (1) If the key is Primary Key then store would be
> > >> ManagedJoinStore or
> > >> > > > Join
> > >> > > > > store using SpillableMap.
> > >> > > > > (2) If the key is not the Primary Key then store would
be Join
> > >> store
> > >> > > > using
> > >> > > > > SpillableArrayListMultiMap.
> > >> > > > >
> > >> > > > > Regards,
> > >> > > > > Chaitanya
> > >> > > > >
> > >> > > > > On Tue, May 17, 2016 at 10:46 PM, Chandni Singh <
> > >> > > singh.chandni@gmail.com
> > >> > > > >
> > >> > > > > wrote:
> > >> > > > >
> > >> > > > > > Hi Chaitanya,
> > >> > > > > >
> > >> > > > > > In the last discussion, we decided that for Join
operation
> we
> > >> > needed
> > >> > > > the
> > >> > > > > > Spillable DataStructures- specifically
> > >> SpillableArrayListMultiMap
> > >> > for
> > >> > > > the
> > >> > > > > > Join Operator.
> > >> > > > > >
> > >> > > > > > Tim had created a ticket APEXMALHAR-2070 to address
this.
> The
> > >> pull
> > >> > > > > request
> > >> > > > > > for an in-memory implementation already exists:
> > >> > > > > > https://github.com/apache/incubator-apex-malhar/pull/262
> > >> > > > > >
> > >> > > > > > As commented on the ticket, Tim is working on
the
> > >> implementation of
> > >> > > > > > SpillableArrayListMultiMap which uses ManagedState.
> > >> > > > > >
> > >> > > > > > The ManagedJoinStore seems to me like a duplicate
of these
> > >> > spoilable
> > >> > > > data
> > >> > > > > > structure. I think we should avoid creating multiple
things
> > that
> > >> > > > provide
> > >> > > > > > the same basic functionality.
> > >> > > > > >
> > >> > > > > > Can you please help review the spillable data
structures
> pull
> > >> > request
> > >> > > > and
> > >> > > > > > point out what you will need for Join that is
missing there?
> > >> > > > > >
> > >> > > > > > Thanks,
> > >> > > > > > Chandni
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On Mon, May 16, 2016 at 10:05 PM, Chaitanya Chebolu
<
> > >> > > > > > chaitanya@datatorrent.com> wrote:
> > >> > > > > >
> > >> > > > > > > Hi All,
> > >> > > > > > >
> > >> > > > > > >   Please go through this design and share
your
> suggestions.
> > >> > > > > > >
> > >> > > > > > > Regards,
> > >> > > > > > > Chaitanya
> > >> > > > > > >
> > >> > > > > > > On Thu, May 12, 2016 at 2:56 PM, Mohit Jotwani
<
> > >> > > > mohit@datatorrent.com>
> > >> > > > > > > wrote:
> > >> > > > > > >
> > >> > > > > > > > +1.
> > >> > > > > > > >
> > >> > > > > > > > This is one of the common use cases
in batch scenarios
> and
> > >> it
> > >> > > will
> > >> > > > be
> > >> > > > > > > great
> > >> > > > > > > > to have this in a stream as well using
managed state,
> > >> > > > > spillableMultiMap
> > >> > > > > > > > structure.
> > >> > > > > > > >
> > >> > > > > > > > Regards,
> > >> > > > > > > > Mohit
> > >> > > > > > > >
> > >> > > > > > > > On Thu, May 12, 2016 at 2:38 PM, Yogi
Devendra <
> > >> > > > > > > > devendra.vyavahare@gmail.com
> > >> > > > > > > > > wrote:
> > >> > > > > > > >
> > >> > > > > > > > > +1 for the Proposal.
> > >> > > > > > > > >
> > >> > > > > > > > > This will be useful for joins involving
large amount
> of
> > >> data
> > >> > to
> > >> > > > be
> > >> > > > > > held
> > >> > > > > > > > > intermediately.
> > >> > > > > > > > >
> > >> > > > > > > > > ~ Yogi
> > >> > > > > > > > >
> > >> > > > > > > > > On 9 May 2016 at 14:05, Chaitanya
Chebolu <
> > >> > > > > chaitanya@datatorrent.com
> > >> > > > > > >
> > >> > > > > > > > > wrote:
> > >> > > > > > > > >
> > >> > > > > > > > > > Hi ,
> > >> > > > > > > > > >
> > >> > > > > > > > > >    Malhar library has in-memory
join operator and
> this
> > >> > can't
> > >> > > > > > process
> > >> > > > > > > > > large
> > >> > > > > > > > > > amount of data because of
checkpoint and Memory
> > >> > bottlenecks.
> > >> > > To
> > >> > > > > > avoid
> > >> > > > > > > > > > these, I am proposing inner
join operator using
> > Managed
> > >> > State
> > >> > > > > that
> > >> > > > > > is
> > >> > > > > > > > > > recently added to Malhar.
> > >> > > > > > > > > >
> > >> > > > > > > > > > Details of Inner Join operator,
Managed State and
> > Design
> > >> > for
> > >> > > > > Inner
> > >> > > > > > > Join
> > >> > > > > > > > > > operator using Managed State
are given below:
> > >> > > > > > > > > >
> > >> > > > > > > > > > Inner Join Operator
> > >> > > > > > > > > > --------------------------
> > >> > > > > > > > > > Join Operator pairs tuples
from two relational
> streams
> > >> that
> > >> > > > > matches
> > >> > > > > > > the
> > >> > > > > > > > > > join condition and emits the
paired tuples.
> > >> > > > > > > > > >
> > >> > > > > > > > > > For example, let's say S1
and S2 are two input
> streams
> > >> and
> > >> > > join
> > >> > > > > > > > condition
> > >> > > > > > > > > > is
> > >> > > > > > > > > > (S1.a.key = S2.b.key) and
|S1.a.time - S2.b.time| <
> t1
> > >> > > > > > > > > > where a and b are tuples coming
on S1 and S2
> > >> respectively,
> > >> > t1
> > >> > > > is
> > >> > > > > > the
> > >> > > > > > > > time
> > >> > > > > > > > > > period
> > >> > > > > > > > > >
> > >> > > > > > > > > > Based on the above join condition,
join operator
> needs
> > >> to
> > >> > > store
> > >> > > > > > > tuples
> > >> > > > > > > > > for
> > >> > > > > > > > > > "t1" time. Let's say JS1 and
JS2 are their
> respective
> > >> > stores
> > >> > > > > where
> > >> > > > > > > > these
> > >> > > > > > > > > > tuples are stored.
> > >> > > > > > > > > >
> > >> > > > > > > > > > Let (k1, v1) be the tuple
coming on Stream S1.
> > >> Following
> > >> > are
> > >> > > > the
> > >> > > > > > > steps
> > >> > > > > > > > > to
> > >> > > > > > > > > > be done for join operation:
> > >> > > > > > > > > >
> > >> > > > > > > > > >    1. Insert (k1, v1) into
the store JS1.
> > >> > > > > > > > > >    2. For Key k1, get the
values from store JS2.
> > >> > > > > > > > > >    3. If step 2 returns non-null
set, apply join
> > >> conditions
> > >> > > > > > > > > >
> > >> > > > > > > > > > The similar procedure is applied
if the tuple is on
> > >> Stream
> > >> > > S2,
> > >> > > > > > > swapping
> > >> > > > > > > > > JS1
> > >> > > > > > > > > > and JS2 in the steps above.
> > >> > > > > > > > > >
> > >> > > > > > > > > > Managed State
> > >> > > > > > > > > > --------------------
> > >> > > > > > > > > > Managed State is an incremental
check-pointing and
> > >> > > > fault-tolerant
> > >> > > > > > key
> > >> > > > > > > > > value
> > >> > > > > > > > > > data structure. For more info
about Managed State,
> > >> please
> > >> > > have
> > >> > > > a
> > >> > > > > > look
> > >> > > > > > > > at
> > >> > > > > > > > > > the below link:
> > >> > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/managed
> > >> > > > > > > > > >
> > >> > > > > > > > > > Abstract Join Operator
> > >> > > > > > > > > > --------------------------------
> > >> > > > > > > > > > Abstract implementation of
Join operator is
> available
> > at
> > >> > > > > > > > > > com.datatorrent.lib.join package.
JoinStore is a
> store
> > >> > > > interface
> > >> > > > > > used
> > >> > > > > > > > in
> > >> > > > > > > > > > join operator and available
at the same package.
> > >> > > > > > > > > > For more details about the
join operator,  please
> > have a
> > >> > look
> > >> > > > at
> > >> > > > > > the
> > >> > > > > > > > > below
> > >> > > > > > > > > > link:
> > >> > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/join
> > >> > > > > > > > > >
> > >> > > > > > > > > > Design of Join Operator Using
Managed State
> > >> > > > > > > > > >
> > >> > -------------------------------------------------------------
> > >> > > > > > > > > > We need to provide a concrete
store using Managed
> > State
> > >> and
> > >> > > > would
> > >> > > > > > be
> > >> > > > > > > > like
> > >> > > > > > > > > > as below:
> > >> > > > > > > > > >
> > >> > > > > > > > > > public class ManagedJoinStore
extends
> > >> ManagedTimeStateImpl
> > >> > > > > > implements
> > >> > > > > > > > > > JoinStore
> > >> > > > > > > > > > {
> > >> > > > > > > > > > }
> > >> > > > > > > > > >
> > >> > > > > > > > > > Data in Managed store would
be in the form of Map of
> > >> byte[]
> > >> > > to
> > >> > > > > > > byte[].
> > >> > > > > > > > > >
> > >> > > > > > > > > > If the value is list, then
inserting (k1, v1) into
> the
> > >> > store
> > >> > > > > would
> > >> > > > > > be
> > >> > > > > > > > the
> > >> > > > > > > > > > following steps:
> > >> > > > > > > > > > (1) For the key k1, get the
value from store JS2.
> > (Here
> > >> we
> > >> > > need
> > >> > > > > to
> > >> > > > > > > > search
> > >> > > > > > > > > > it in all the time buckets,
if the key is not
> present
> > in
> > >> > > > Memory)
> > >> > > > > > > > > > (2) Convert the above value
to List.
> > >> > > > > > > > > > (3) Add the value v1 to the
above list.
> > >> > > > > > > > > > (4) Convert the new list to
slice.
> > >> > > > > > > > > > (5) Insert (k1, new slice)
into the Managed state.
> > >> > > > > > > > > >
> > >> > > > > > > > > > Values as list has been discussed
here:
> > >> > > > > > > > > >
> https://issues.apache.org/jira/browse/APEXMALHAR-2026
> > >> > > > > > > > > >
> > >> > > > > > > > > > Based on above JIRA, I am
suggesting
> SpillableMultiMap
> > >> data
> > >> > > > > > structure
> > >> > > > > > > > for
> > >> > > > > > > > > > list of values. Let the store
would be
> > >> > > > > SpillableMultiMapJoinStore.
> > >> > > > > > > > > >
> > >> > > > > > > > > > Based on above details, store
is categorized based
> on
> > >> key
> > >> > > type.
> > >> > > > > > > > > >
> > >> > > > > > > > > >          Key
> > >> >  Store
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> --------------------------------------------------------------------------
> > >> > > > > > > > > > Primary
> > >> ManagedJoinStore
> > >> > > > > > > > > > Not Primary
> > >> > > > > SpillableMultiMapJoinStore
> > >> > > > > > > > > >
> > >> > > > > > > > > > Following additional properties
would be exposed by
> > the
> > >> > Join
> > >> > > > > > > operator:
> > >> > > > > > > > > >
> > >> > > > > > > > > >    - isPrimaryKey - whether
the key is primary or
> not.
> > >> > > > > > > > > >    - noOfBuckets - Number
of key buckets. This
> > >> parameter is
> > >> > > > > > required
> > >> > > > > > > > for
> > >> > > > > > > > > >    Managed State.
> > >> > > > > > > > > >
> > >> > > > > > > > > > Please provide your suggestions.
> > >> > > > > > > > > >
> > >> > > > > > > > > > Regards,
> > >> > > > > > > > > > Chaitanya
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message