apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chaitanya Chebolu <chaita...@datatorrent.com>
Subject Re: Inner Join Operator Using Managed State
Date Tue, 17 May 2016 17:36:33 GMT
Hi Chandni,

  I am suggesting two stores and categorized those based on key type:
(1) If the key is Primary Key then store would be ManagedJoinStore or Join
store using SpillableMap.
(2) If the key is not the Primary Key then store would be Join store using
SpillableArrayListMultiMap.

Regards,
Chaitanya

On Tue, May 17, 2016 at 10:46 PM, Chandni Singh <singh.chandni@gmail.com>
wrote:

> Hi Chaitanya,
>
> In the last discussion, we decided that for Join operation we needed the
> Spillable DataStructures- specifically SpillableArrayListMultiMap for the
> Join Operator.
>
> Tim had created a ticket APEXMALHAR-2070 to address this.  The pull request
> for an in-memory implementation already exists:
> https://github.com/apache/incubator-apex-malhar/pull/262
>
> As commented on the ticket, Tim is working on the implementation of
> SpillableArrayListMultiMap which uses ManagedState.
>
> The ManagedJoinStore seems to me like a duplicate of these spoilable data
> structure. I think we should avoid creating multiple things that provide
> the same basic functionality.
>
> Can you please help review the spillable data structures pull request and
> point out what you will need for Join that is missing there?
>
> Thanks,
> Chandni
>
>
>
> On Mon, May 16, 2016 at 10:05 PM, Chaitanya Chebolu <
> chaitanya@datatorrent.com> wrote:
>
> > Hi All,
> >
> >   Please go through this design and share your suggestions.
> >
> > Regards,
> > Chaitanya
> >
> > On Thu, May 12, 2016 at 2:56 PM, Mohit Jotwani <mohit@datatorrent.com>
> > wrote:
> >
> > > +1.
> > >
> > > This is one of the common use cases in batch scenarios and it will be
> > great
> > > to have this in a stream as well using managed state, spillableMultiMap
> > > structure.
> > >
> > > Regards,
> > > Mohit
> > >
> > > On Thu, May 12, 2016 at 2:38 PM, Yogi Devendra <
> > > devendra.vyavahare@gmail.com
> > > > wrote:
> > >
> > > > +1 for the Proposal.
> > > >
> > > > This will be useful for joins involving large amount of data to be
> held
> > > > intermediately.
> > > >
> > > > ~ Yogi
> > > >
> > > > On 9 May 2016 at 14:05, Chaitanya Chebolu <chaitanya@datatorrent.com
> >
> > > > wrote:
> > > >
> > > > > Hi ,
> > > > >
> > > > >    Malhar library has in-memory join operator and this can't
> process
> > > > large
> > > > > amount of data because of checkpoint and Memory bottlenecks. To
> avoid
> > > > > these, I am proposing inner join operator using Managed State that
> is
> > > > > recently added to Malhar.
> > > > >
> > > > > Details of Inner Join operator, Managed State and Design for Inner
> > Join
> > > > > operator using Managed State are given below:
> > > > >
> > > > > Inner Join Operator
> > > > > --------------------------
> > > > > Join Operator pairs tuples from two relational streams that matches
> > the
> > > > > join condition and emits the paired tuples.
> > > > >
> > > > > For example, let's say S1 and S2 are two input streams and join
> > > condition
> > > > > is
> > > > > (S1.a.key = S2.b.key) and |S1.a.time - S2.b.time| < t1
> > > > > where a and b are tuples coming on S1 and S2 respectively, t1 is
> the
> > > time
> > > > > period
> > > > >
> > > > > Based on the above join condition, join operator needs to store
> > tuples
> > > > for
> > > > > "t1" time. Let's say JS1 and JS2 are their respective stores where
> > > these
> > > > > tuples are stored.
> > > > >
> > > > > Let (k1, v1) be the tuple coming on Stream S1.  Following are the
> > steps
> > > > to
> > > > > be done for join operation:
> > > > >
> > > > >    1. Insert (k1, v1) into the store JS1.
> > > > >    2. For Key k1, get the values from store JS2.
> > > > >    3. If step 2 returns non-null set, apply join conditions
> > > > >
> > > > > The similar procedure is applied if the tuple is on Stream S2,
> > swapping
> > > > JS1
> > > > > and JS2 in the steps above.
> > > > >
> > > > > Managed State
> > > > > --------------------
> > > > > Managed State is an incremental check-pointing and fault-tolerant
> key
> > > > value
> > > > > data structure. For more info about Managed State, please have a
> look
> > > at
> > > > > the below link:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/managed
> > > > >
> > > > > Abstract Join Operator
> > > > > --------------------------------
> > > > > Abstract implementation of Join operator is available at
> > > > > com.datatorrent.lib.join package. JoinStore is a store interface
> used
> > > in
> > > > > join operator and available at the same package.
> > > > > For more details about the join operator,  please have a look at
> the
> > > > below
> > > > > link:
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/join
> > > > >
> > > > > Design of Join Operator Using Managed State
> > > > > -------------------------------------------------------------
> > > > > We need to provide a concrete store using Managed State and would
> be
> > > like
> > > > > as below:
> > > > >
> > > > > public class ManagedJoinStore extends ManagedTimeStateImpl
> implements
> > > > > JoinStore
> > > > > {
> > > > > }
> > > > >
> > > > > Data in Managed store would be in the form of Map of byte[] to
> > byte[].
> > > > >
> > > > > If the value is list, then inserting (k1, v1) into the store would
> be
> > > the
> > > > > following steps:
> > > > > (1) For the key k1, get the value from store JS2. (Here we need to
> > > search
> > > > > it in all the time buckets, if the key is not present in Memory)
> > > > > (2) Convert the above value to List.
> > > > > (3) Add the value v1 to the above list.
> > > > > (4) Convert the new list to slice.
> > > > > (5) Insert (k1, new slice) into the Managed state.
> > > > >
> > > > > Values as list has been discussed here:
> > > > > https://issues.apache.org/jira/browse/APEXMALHAR-2026
> > > > >
> > > > > Based on above JIRA, I am suggesting SpillableMultiMap data
> structure
> > > for
> > > > > list of values. Let the store would be SpillableMultiMapJoinStore.
> > > > >
> > > > > Based on above details, store is categorized based on key type.
> > > > >
> > > > >          Key                                           Store
> > > > >
> > > >
> > >
> >
> --------------------------------------------------------------------------
> > > > > Primary                                    ManagedJoinStore
> > > > > Not Primary                              SpillableMultiMapJoinStore
> > > > >
> > > > > Following additional properties would be exposed by the Join
> > operator:
> > > > >
> > > > >    - isPrimaryKey - whether the key is primary or not.
> > > > >    - noOfBuckets - Number of key buckets. This parameter is
> required
> > > for
> > > > >    Managed State.
> > > > >
> > > > > Please provide your suggestions.
> > > > >
> > > > > Regards,
> > > > > Chaitanya
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message