apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chandni Singh <singh.chan...@gmail.com>
Subject Re: Inner Join Operator Using Managed State
Date Wed, 18 May 2016 07:21:53 GMT
Chaitanya,

SpillableArrayListMultimap will provide gives you similar abstraction.

Why do we need  another abstraction "Join Store" ?

Chandni

On Tue, May 17, 2016 at 11:30 PM, Chaitanya Chebolu <
chaitanya@datatorrent.com> wrote:

> Chandni,
>
>    JoinStore is an interface and consists of following methods:
> 1) boolean put(Object key, long time, Object value) => Insert (Key, Value)
> pair into the store.
> 2) List<> getTuples(Object key) => Return the list of values from store to
> which the specified key is mapped.
>
>    JoinStore is a plug-able to the join operator. Below properties exposed
> from join operator:
> JoinStore leftStore, rightStore.
>
> By default, leftStore & rightStore would be the Join Store using spillable
> data-structures over ManagedState. If the user wants to integrate different
> store, he/she has to implements the JoinStore and set it to the Join
> operator.
>
>
> Tim,
>
>    I am not planning any different implementation.
>    Here, I proposed to plug the SpillableArrayListMultiMap/SplillableMap
> over managed state. I think this is what you are developing. Please correct
> it, if I am wrong.
>
> Regards,
> Chaitanya
>
> On Tue, May 17, 2016 at 11:24 PM, Timothy Farkas <
> timothytiborfarkas@gmail.com> wrote:
>
> > Chaitanya,
> >
> > Are you planning to use the SpillableMap and SpillableArrayListMultiMap
> > that are in development, or separate implementations? If you are planning
> > on using separate implementations can you please explain why they are
> > needed?
> >
> > Thanks,
> > Tim
> >
> > On Tue, May 17, 2016 at 10:36 AM, Chaitanya Chebolu <
> > chaitanya@datatorrent.com> wrote:
> >
> > > Hi Chandni,
> > >
> > >   I am suggesting two stores and categorized those based on key type:
> > > (1) If the key is Primary Key then store would be ManagedJoinStore or
> > Join
> > > store using SpillableMap.
> > > (2) If the key is not the Primary Key then store would be Join store
> > using
> > > SpillableArrayListMultiMap.
> > >
> > > Regards,
> > > Chaitanya
> > >
> > > On Tue, May 17, 2016 at 10:46 PM, Chandni Singh <
> singh.chandni@gmail.com
> > >
> > > wrote:
> > >
> > > > Hi Chaitanya,
> > > >
> > > > In the last discussion, we decided that for Join operation we needed
> > the
> > > > Spillable DataStructures- specifically SpillableArrayListMultiMap for
> > the
> > > > Join Operator.
> > > >
> > > > Tim had created a ticket APEXMALHAR-2070 to address this.  The pull
> > > request
> > > > for an in-memory implementation already exists:
> > > > https://github.com/apache/incubator-apex-malhar/pull/262
> > > >
> > > > As commented on the ticket, Tim is working on the implementation of
> > > > SpillableArrayListMultiMap which uses ManagedState.
> > > >
> > > > The ManagedJoinStore seems to me like a duplicate of these spoilable
> > data
> > > > structure. I think we should avoid creating multiple things that
> > provide
> > > > the same basic functionality.
> > > >
> > > > Can you please help review the spillable data structures pull request
> > and
> > > > point out what you will need for Join that is missing there?
> > > >
> > > > Thanks,
> > > > Chandni
> > > >
> > > >
> > > >
> > > > On Mon, May 16, 2016 at 10:05 PM, Chaitanya Chebolu <
> > > > chaitanya@datatorrent.com> wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > >   Please go through this design and share your suggestions.
> > > > >
> > > > > Regards,
> > > > > Chaitanya
> > > > >
> > > > > On Thu, May 12, 2016 at 2:56 PM, Mohit Jotwani <
> > mohit@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > > > +1.
> > > > > >
> > > > > > This is one of the common use cases in batch scenarios and it
> will
> > be
> > > > > great
> > > > > > to have this in a stream as well using managed state,
> > > spillableMultiMap
> > > > > > structure.
> > > > > >
> > > > > > Regards,
> > > > > > Mohit
> > > > > >
> > > > > > On Thu, May 12, 2016 at 2:38 PM, Yogi Devendra <
> > > > > > devendra.vyavahare@gmail.com
> > > > > > > wrote:
> > > > > >
> > > > > > > +1 for the Proposal.
> > > > > > >
> > > > > > > This will be useful for joins involving large amount of
data to
> > be
> > > > held
> > > > > > > intermediately.
> > > > > > >
> > > > > > > ~ Yogi
> > > > > > >
> > > > > > > On 9 May 2016 at 14:05, Chaitanya Chebolu <
> > > chaitanya@datatorrent.com
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi ,
> > > > > > > >
> > > > > > > >    Malhar library has in-memory join operator and
this can't
> > > > process
> > > > > > > large
> > > > > > > > amount of data because of checkpoint and Memory bottlenecks.
> To
> > > > avoid
> > > > > > > > these, I am proposing inner join operator using Managed
State
> > > that
> > > > is
> > > > > > > > recently added to Malhar.
> > > > > > > >
> > > > > > > > Details of Inner Join operator, Managed State and
Design for
> > > Inner
> > > > > Join
> > > > > > > > operator using Managed State are given below:
> > > > > > > >
> > > > > > > > Inner Join Operator
> > > > > > > > --------------------------
> > > > > > > > Join Operator pairs tuples from two relational streams
that
> > > matches
> > > > > the
> > > > > > > > join condition and emits the paired tuples.
> > > > > > > >
> > > > > > > > For example, let's say S1 and S2 are two input streams
and
> join
> > > > > > condition
> > > > > > > > is
> > > > > > > > (S1.a.key = S2.b.key) and |S1.a.time - S2.b.time|
< t1
> > > > > > > > where a and b are tuples coming on S1 and S2 respectively,
t1
> > is
> > > > the
> > > > > > time
> > > > > > > > period
> > > > > > > >
> > > > > > > > Based on the above join condition, join operator needs
to
> store
> > > > > tuples
> > > > > > > for
> > > > > > > > "t1" time. Let's say JS1 and JS2 are their respective
stores
> > > where
> > > > > > these
> > > > > > > > tuples are stored.
> > > > > > > >
> > > > > > > > Let (k1, v1) be the tuple coming on Stream S1.  Following
are
> > the
> > > > > steps
> > > > > > > to
> > > > > > > > be done for join operation:
> > > > > > > >
> > > > > > > >    1. Insert (k1, v1) into the store JS1.
> > > > > > > >    2. For Key k1, get the values from store JS2.
> > > > > > > >    3. If step 2 returns non-null set, apply join conditions
> > > > > > > >
> > > > > > > > The similar procedure is applied if the tuple is on
Stream
> S2,
> > > > > swapping
> > > > > > > JS1
> > > > > > > > and JS2 in the steps above.
> > > > > > > >
> > > > > > > > Managed State
> > > > > > > > --------------------
> > > > > > > > Managed State is an incremental check-pointing and
> > fault-tolerant
> > > > key
> > > > > > > value
> > > > > > > > data structure. For more info about Managed State,
please
> have
> > a
> > > > look
> > > > > > at
> > > > > > > > the below link:
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/managed
> > > > > > > >
> > > > > > > > Abstract Join Operator
> > > > > > > > --------------------------------
> > > > > > > > Abstract implementation of Join operator is available
at
> > > > > > > > com.datatorrent.lib.join package. JoinStore is a store
> > interface
> > > > used
> > > > > > in
> > > > > > > > join operator and available at the same package.
> > > > > > > > For more details about the join operator,  please
have a look
> > at
> > > > the
> > > > > > > below
> > > > > > > > link:
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/join
> > > > > > > >
> > > > > > > > Design of Join Operator Using Managed State
> > > > > > > > -------------------------------------------------------------
> > > > > > > > We need to provide a concrete store using Managed
State and
> > would
> > > > be
> > > > > > like
> > > > > > > > as below:
> > > > > > > >
> > > > > > > > public class ManagedJoinStore extends ManagedTimeStateImpl
> > > > implements
> > > > > > > > JoinStore
> > > > > > > > {
> > > > > > > > }
> > > > > > > >
> > > > > > > > Data in Managed store would be in the form of Map
of byte[]
> to
> > > > > byte[].
> > > > > > > >
> > > > > > > > If the value is list, then inserting (k1, v1) into
the store
> > > would
> > > > be
> > > > > > the
> > > > > > > > following steps:
> > > > > > > > (1) For the key k1, get the value from store JS2.
(Here we
> need
> > > to
> > > > > > search
> > > > > > > > it in all the time buckets, if the key is not present
in
> > Memory)
> > > > > > > > (2) Convert the above value to List.
> > > > > > > > (3) Add the value v1 to the above list.
> > > > > > > > (4) Convert the new list to slice.
> > > > > > > > (5) Insert (k1, new slice) into the Managed state.
> > > > > > > >
> > > > > > > > Values as list has been discussed here:
> > > > > > > > https://issues.apache.org/jira/browse/APEXMALHAR-2026
> > > > > > > >
> > > > > > > > Based on above JIRA, I am suggesting SpillableMultiMap
data
> > > > structure
> > > > > > for
> > > > > > > > list of values. Let the store would be
> > > SpillableMultiMapJoinStore.
> > > > > > > >
> > > > > > > > Based on above details, store is categorized based
on key
> type.
> > > > > > > >
> > > > > > > >          Key                                     
     Store
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> --------------------------------------------------------------------------
> > > > > > > > Primary                                    ManagedJoinStore
> > > > > > > > Not Primary
> > > SpillableMultiMapJoinStore
> > > > > > > >
> > > > > > > > Following additional properties would be exposed by
the Join
> > > > > operator:
> > > > > > > >
> > > > > > > >    - isPrimaryKey - whether the key is primary or
not.
> > > > > > > >    - noOfBuckets - Number of key buckets. This parameter
is
> > > > required
> > > > > > for
> > > > > > > >    Managed State.
> > > > > > > >
> > > > > > > > Please provide your suggestions.
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Chaitanya
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message