apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chaitanya Chebolu <chaita...@datatorrent.com>
Subject Re: Inner Join Operator Using Managed State
Date Tue, 17 May 2016 05:05:45 GMT
Hi All,

  Please go through this design and share your suggestions.

Regards,
Chaitanya

On Thu, May 12, 2016 at 2:56 PM, Mohit Jotwani <mohit@datatorrent.com>
wrote:

> +1.
>
> This is one of the common use cases in batch scenarios and it will be great
> to have this in a stream as well using managed state, spillableMultiMap
> structure.
>
> Regards,
> Mohit
>
> On Thu, May 12, 2016 at 2:38 PM, Yogi Devendra <
> devendra.vyavahare@gmail.com
> > wrote:
>
> > +1 for the Proposal.
> >
> > This will be useful for joins involving large amount of data to be held
> > intermediately.
> >
> > ~ Yogi
> >
> > On 9 May 2016 at 14:05, Chaitanya Chebolu <chaitanya@datatorrent.com>
> > wrote:
> >
> > > Hi ,
> > >
> > >    Malhar library has in-memory join operator and this can't process
> > large
> > > amount of data because of checkpoint and Memory bottlenecks. To avoid
> > > these, I am proposing inner join operator using Managed State that is
> > > recently added to Malhar.
> > >
> > > Details of Inner Join operator, Managed State and Design for Inner Join
> > > operator using Managed State are given below:
> > >
> > > Inner Join Operator
> > > --------------------------
> > > Join Operator pairs tuples from two relational streams that matches the
> > > join condition and emits the paired tuples.
> > >
> > > For example, let's say S1 and S2 are two input streams and join
> condition
> > > is
> > > (S1.a.key = S2.b.key) and |S1.a.time - S2.b.time| < t1
> > > where a and b are tuples coming on S1 and S2 respectively, t1 is the
> time
> > > period
> > >
> > > Based on the above join condition, join operator needs to store tuples
> > for
> > > "t1" time. Let's say JS1 and JS2 are their respective stores where
> these
> > > tuples are stored.
> > >
> > > Let (k1, v1) be the tuple coming on Stream S1.  Following are the steps
> > to
> > > be done for join operation:
> > >
> > >    1. Insert (k1, v1) into the store JS1.
> > >    2. For Key k1, get the values from store JS2.
> > >    3. If step 2 returns non-null set, apply join conditions
> > >
> > > The similar procedure is applied if the tuple is on Stream S2, swapping
> > JS1
> > > and JS2 in the steps above.
> > >
> > > Managed State
> > > --------------------
> > > Managed State is an incremental check-pointing and fault-tolerant key
> > value
> > > data structure. For more info about Managed State, please have a look
> at
> > > the below link:
> > >
> > >
> > >
> >
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/managed
> > >
> > > Abstract Join Operator
> > > --------------------------------
> > > Abstract implementation of Join operator is available at
> > > com.datatorrent.lib.join package. JoinStore is a store interface used
> in
> > > join operator and available at the same package.
> > > For more details about the join operator,  please have a look at the
> > below
> > > link:
> > >
> > >
> >
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/join
> > >
> > > Design of Join Operator Using Managed State
> > > -------------------------------------------------------------
> > > We need to provide a concrete store using Managed State and would be
> like
> > > as below:
> > >
> > > public class ManagedJoinStore extends ManagedTimeStateImpl implements
> > > JoinStore
> > > {
> > > }
> > >
> > > Data in Managed store would be in the form of Map of byte[] to byte[].
> > >
> > > If the value is list, then inserting (k1, v1) into the store would be
> the
> > > following steps:
> > > (1) For the key k1, get the value from store JS2. (Here we need to
> search
> > > it in all the time buckets, if the key is not present in Memory)
> > > (2) Convert the above value to List.
> > > (3) Add the value v1 to the above list.
> > > (4) Convert the new list to slice.
> > > (5) Insert (k1, new slice) into the Managed state.
> > >
> > > Values as list has been discussed here:
> > > https://issues.apache.org/jira/browse/APEXMALHAR-2026
> > >
> > > Based on above JIRA, I am suggesting SpillableMultiMap data structure
> for
> > > list of values. Let the store would be SpillableMultiMapJoinStore.
> > >
> > > Based on above details, store is categorized based on key type.
> > >
> > >          Key                                           Store
> > >
> >
> --------------------------------------------------------------------------
> > > Primary                                    ManagedJoinStore
> > > Not Primary                              SpillableMultiMapJoinStore
> > >
> > > Following additional properties would be exposed by the Join operator:
> > >
> > >    - isPrimaryKey - whether the key is primary or not.
> > >    - noOfBuckets - Number of key buckets. This parameter is required
> for
> > >    Managed State.
> > >
> > > Please provide your suggestions.
> > >
> > > Regards,
> > > Chaitanya
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message