apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yogi Devendra <devendra.vyavah...@gmail.com>
Subject Re: Inner Join Operator Using Managed State
Date Thu, 12 May 2016 09:08:31 GMT
+1 for the Proposal.

This will be useful for joins involving large amount of data to be held
intermediately.

~ Yogi

On 9 May 2016 at 14:05, Chaitanya Chebolu <chaitanya@datatorrent.com> wrote:

> Hi ,
>
>    Malhar library has in-memory join operator and this can't process large
> amount of data because of checkpoint and Memory bottlenecks. To avoid
> these, I am proposing inner join operator using Managed State that is
> recently added to Malhar.
>
> Details of Inner Join operator, Managed State and Design for Inner Join
> operator using Managed State are given below:
>
> Inner Join Operator
> --------------------------
> Join Operator pairs tuples from two relational streams that matches the
> join condition and emits the paired tuples.
>
> For example, let's say S1 and S2 are two input streams and join condition
> is
> (S1.a.key = S2.b.key) and |S1.a.time - S2.b.time| < t1
> where a and b are tuples coming on S1 and S2 respectively, t1 is the time
> period
>
> Based on the above join condition, join operator needs to store tuples for
> "t1" time. Let's say JS1 and JS2 are their respective stores where these
> tuples are stored.
>
> Let (k1, v1) be the tuple coming on Stream S1.  Following are the steps to
> be done for join operation:
>
>    1. Insert (k1, v1) into the store JS1.
>    2. For Key k1, get the values from store JS2.
>    3. If step 2 returns non-null set, apply join conditions
>
> The similar procedure is applied if the tuple is on Stream S2, swapping JS1
> and JS2 in the steps above.
>
> Managed State
> --------------------
> Managed State is an incremental check-pointing and fault-tolerant key value
> data structure. For more info about Managed State, please have a look at
> the below link:
>
>
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/managed
>
> Abstract Join Operator
> --------------------------------
> Abstract implementation of Join operator is available at
> com.datatorrent.lib.join package. JoinStore is a store interface used in
> join operator and available at the same package.
> For more details about the join operator,  please have a look at the below
> link:
>
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/join
>
> Design of Join Operator Using Managed State
> -------------------------------------------------------------
> We need to provide a concrete store using Managed State and would be like
> as below:
>
> public class ManagedJoinStore extends ManagedTimeStateImpl implements
> JoinStore
> {
> }
>
> Data in Managed store would be in the form of Map of byte[] to byte[].
>
> If the value is list, then inserting (k1, v1) into the store would be the
> following steps:
> (1) For the key k1, get the value from store JS2. (Here we need to search
> it in all the time buckets, if the key is not present in Memory)
> (2) Convert the above value to List.
> (3) Add the value v1 to the above list.
> (4) Convert the new list to slice.
> (5) Insert (k1, new slice) into the Managed state.
>
> Values as list has been discussed here:
> https://issues.apache.org/jira/browse/APEXMALHAR-2026
>
> Based on above JIRA, I am suggesting SpillableMultiMap data structure for
> list of values. Let the store would be SpillableMultiMapJoinStore.
>
> Based on above details, store is categorized based on key type.
>
>          Key                                           Store
> --------------------------------------------------------------------------
> Primary                                    ManagedJoinStore
> Not Primary                              SpillableMultiMapJoinStore
>
> Following additional properties would be exposed by the Join operator:
>
>    - isPrimaryKey - whether the key is primary or not.
>    - noOfBuckets - Number of key buckets. This parameter is required for
>    Managed State.
>
> Please provide your suggestions.
>
> Regards,
> Chaitanya
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message