apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chaitanya Chebolu <chaita...@datatorrent.com>
Subject Re: Inner Join Operator Using Managed State
Date Wed, 18 May 2016 08:00:22 GMT
Hi Chandni,

   I think you are suggesting about interface "TimeSlicedBucketedState". I
feel this is tightly coupled with the Managed State.
   In "TimeSlicedBucketedState" abstraction, bucketId parameter relates to
the Managed State and this is not needed for join operator.

Regards,
Chaitanya

On Wed, May 18, 2016 at 12:51 PM, Chandni Singh <singh.chandni@gmail.com>
wrote:

> Chaitanya,
>
> SpillableArrayListMultimap will provide gives you similar abstraction.
>
> Why do we need  another abstraction "Join Store" ?
>
> Chandni
>
> On Tue, May 17, 2016 at 11:30 PM, Chaitanya Chebolu <
> chaitanya@datatorrent.com> wrote:
>
> > Chandni,
> >
> >    JoinStore is an interface and consists of following methods:
> > 1) boolean put(Object key, long time, Object value) => Insert (Key,
> Value)
> > pair into the store.
> > 2) List<> getTuples(Object key) => Return the list of values from store
> to
> > which the specified key is mapped.
> >
> >    JoinStore is a plug-able to the join operator. Below properties
> exposed
> > from join operator:
> > JoinStore leftStore, rightStore.
> >
> > By default, leftStore & rightStore would be the Join Store using
> spillable
> > data-structures over ManagedState. If the user wants to integrate
> different
> > store, he/she has to implements the JoinStore and set it to the Join
> > operator.
> >
> >
> > Tim,
> >
> >    I am not planning any different implementation.
> >    Here, I proposed to plug the SpillableArrayListMultiMap/SplillableMap
> > over managed state. I think this is what you are developing. Please
> correct
> > it, if I am wrong.
> >
> > Regards,
> > Chaitanya
> >
> > On Tue, May 17, 2016 at 11:24 PM, Timothy Farkas <
> > timothytiborfarkas@gmail.com> wrote:
> >
> > > Chaitanya,
> > >
> > > Are you planning to use the SpillableMap and SpillableArrayListMultiMap
> > > that are in development, or separate implementations? If you are
> planning
> > > on using separate implementations can you please explain why they are
> > > needed?
> > >
> > > Thanks,
> > > Tim
> > >
> > > On Tue, May 17, 2016 at 10:36 AM, Chaitanya Chebolu <
> > > chaitanya@datatorrent.com> wrote:
> > >
> > > > Hi Chandni,
> > > >
> > > >   I am suggesting two stores and categorized those based on key type:
> > > > (1) If the key is Primary Key then store would be ManagedJoinStore or
> > > Join
> > > > store using SpillableMap.
> > > > (2) If the key is not the Primary Key then store would be Join store
> > > using
> > > > SpillableArrayListMultiMap.
> > > >
> > > > Regards,
> > > > Chaitanya
> > > >
> > > > On Tue, May 17, 2016 at 10:46 PM, Chandni Singh <
> > singh.chandni@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Hi Chaitanya,
> > > > >
> > > > > In the last discussion, we decided that for Join operation we
> needed
> > > the
> > > > > Spillable DataStructures- specifically SpillableArrayListMultiMap
> for
> > > the
> > > > > Join Operator.
> > > > >
> > > > > Tim had created a ticket APEXMALHAR-2070 to address this.  The pull
> > > > request
> > > > > for an in-memory implementation already exists:
> > > > > https://github.com/apache/incubator-apex-malhar/pull/262
> > > > >
> > > > > As commented on the ticket, Tim is working on the implementation
of
> > > > > SpillableArrayListMultiMap which uses ManagedState.
> > > > >
> > > > > The ManagedJoinStore seems to me like a duplicate of these
> spoilable
> > > data
> > > > > structure. I think we should avoid creating multiple things that
> > > provide
> > > > > the same basic functionality.
> > > > >
> > > > > Can you please help review the spillable data structures pull
> request
> > > and
> > > > > point out what you will need for Join that is missing there?
> > > > >
> > > > > Thanks,
> > > > > Chandni
> > > > >
> > > > >
> > > > >
> > > > > On Mon, May 16, 2016 at 10:05 PM, Chaitanya Chebolu <
> > > > > chaitanya@datatorrent.com> wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > >   Please go through this design and share your suggestions.
> > > > > >
> > > > > > Regards,
> > > > > > Chaitanya
> > > > > >
> > > > > > On Thu, May 12, 2016 at 2:56 PM, Mohit Jotwani <
> > > mohit@datatorrent.com>
> > > > > > wrote:
> > > > > >
> > > > > > > +1.
> > > > > > >
> > > > > > > This is one of the common use cases in batch scenarios
and it
> > will
> > > be
> > > > > > great
> > > > > > > to have this in a stream as well using managed state,
> > > > spillableMultiMap
> > > > > > > structure.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Mohit
> > > > > > >
> > > > > > > On Thu, May 12, 2016 at 2:38 PM, Yogi Devendra <
> > > > > > > devendra.vyavahare@gmail.com
> > > > > > > > wrote:
> > > > > > >
> > > > > > > > +1 for the Proposal.
> > > > > > > >
> > > > > > > > This will be useful for joins involving large amount
of data
> to
> > > be
> > > > > held
> > > > > > > > intermediately.
> > > > > > > >
> > > > > > > > ~ Yogi
> > > > > > > >
> > > > > > > > On 9 May 2016 at 14:05, Chaitanya Chebolu <
> > > > chaitanya@datatorrent.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi ,
> > > > > > > > >
> > > > > > > > >    Malhar library has in-memory join operator
and this
> can't
> > > > > process
> > > > > > > > large
> > > > > > > > > amount of data because of checkpoint and Memory
> bottlenecks.
> > To
> > > > > avoid
> > > > > > > > > these, I am proposing inner join operator using
Managed
> State
> > > > that
> > > > > is
> > > > > > > > > recently added to Malhar.
> > > > > > > > >
> > > > > > > > > Details of Inner Join operator, Managed State
and Design
> for
> > > > Inner
> > > > > > Join
> > > > > > > > > operator using Managed State are given below:
> > > > > > > > >
> > > > > > > > > Inner Join Operator
> > > > > > > > > --------------------------
> > > > > > > > > Join Operator pairs tuples from two relational
streams that
> > > > matches
> > > > > > the
> > > > > > > > > join condition and emits the paired tuples.
> > > > > > > > >
> > > > > > > > > For example, let's say S1 and S2 are two input
streams and
> > join
> > > > > > > condition
> > > > > > > > > is
> > > > > > > > > (S1.a.key = S2.b.key) and |S1.a.time - S2.b.time|
< t1
> > > > > > > > > where a and b are tuples coming on S1 and S2
respectively,
> t1
> > > is
> > > > > the
> > > > > > > time
> > > > > > > > > period
> > > > > > > > >
> > > > > > > > > Based on the above join condition, join operator
needs to
> > store
> > > > > > tuples
> > > > > > > > for
> > > > > > > > > "t1" time. Let's say JS1 and JS2 are their respective
> stores
> > > > where
> > > > > > > these
> > > > > > > > > tuples are stored.
> > > > > > > > >
> > > > > > > > > Let (k1, v1) be the tuple coming on Stream S1.
 Following
> are
> > > the
> > > > > > steps
> > > > > > > > to
> > > > > > > > > be done for join operation:
> > > > > > > > >
> > > > > > > > >    1. Insert (k1, v1) into the store JS1.
> > > > > > > > >    2. For Key k1, get the values from store JS2.
> > > > > > > > >    3. If step 2 returns non-null set, apply join
conditions
> > > > > > > > >
> > > > > > > > > The similar procedure is applied if the tuple
is on Stream
> > S2,
> > > > > > swapping
> > > > > > > > JS1
> > > > > > > > > and JS2 in the steps above.
> > > > > > > > >
> > > > > > > > > Managed State
> > > > > > > > > --------------------
> > > > > > > > > Managed State is an incremental check-pointing
and
> > > fault-tolerant
> > > > > key
> > > > > > > > value
> > > > > > > > > data structure. For more info about Managed State,
please
> > have
> > > a
> > > > > look
> > > > > > > at
> > > > > > > > > the below link:
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/managed
> > > > > > > > >
> > > > > > > > > Abstract Join Operator
> > > > > > > > > --------------------------------
> > > > > > > > > Abstract implementation of Join operator is available
at
> > > > > > > > > com.datatorrent.lib.join package. JoinStore is
a store
> > > interface
> > > > > used
> > > > > > > in
> > > > > > > > > join operator and available at the same package.
> > > > > > > > > For more details about the join operator,  please
have a
> look
> > > at
> > > > > the
> > > > > > > > below
> > > > > > > > > link:
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/join
> > > > > > > > >
> > > > > > > > > Design of Join Operator Using Managed State
> > > > > > > > >
> -------------------------------------------------------------
> > > > > > > > > We need to provide a concrete store using Managed
State and
> > > would
> > > > > be
> > > > > > > like
> > > > > > > > > as below:
> > > > > > > > >
> > > > > > > > > public class ManagedJoinStore extends ManagedTimeStateImpl
> > > > > implements
> > > > > > > > > JoinStore
> > > > > > > > > {
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > > Data in Managed store would be in the form of
Map of byte[]
> > to
> > > > > > byte[].
> > > > > > > > >
> > > > > > > > > If the value is list, then inserting (k1, v1)
into the
> store
> > > > would
> > > > > be
> > > > > > > the
> > > > > > > > > following steps:
> > > > > > > > > (1) For the key k1, get the value from store
JS2. (Here we
> > need
> > > > to
> > > > > > > search
> > > > > > > > > it in all the time buckets, if the key is not
present in
> > > Memory)
> > > > > > > > > (2) Convert the above value to List.
> > > > > > > > > (3) Add the value v1 to the above list.
> > > > > > > > > (4) Convert the new list to slice.
> > > > > > > > > (5) Insert (k1, new slice) into the Managed state.
> > > > > > > > >
> > > > > > > > > Values as list has been discussed here:
> > > > > > > > > https://issues.apache.org/jira/browse/APEXMALHAR-2026
> > > > > > > > >
> > > > > > > > > Based on above JIRA, I am suggesting SpillableMultiMap
data
> > > > > structure
> > > > > > > for
> > > > > > > > > list of values. Let the store would be
> > > > SpillableMultiMapJoinStore.
> > > > > > > > >
> > > > > > > > > Based on above details, store is categorized
based on key
> > type.
> > > > > > > > >
> > > > > > > > >          Key
>  Store
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> --------------------------------------------------------------------------
> > > > > > > > > Primary                                    ManagedJoinStore
> > > > > > > > > Not Primary
> > > > SpillableMultiMapJoinStore
> > > > > > > > >
> > > > > > > > > Following additional properties would be exposed
by the
> Join
> > > > > > operator:
> > > > > > > > >
> > > > > > > > >    - isPrimaryKey - whether the key is primary
or not.
> > > > > > > > >    - noOfBuckets - Number of key buckets. This
parameter is
> > > > > required
> > > > > > > for
> > > > > > > > >    Managed State.
> > > > > > > > >
> > > > > > > > > Please provide your suggestions.
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Chaitanya
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message