apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chandni Singh <singh.chan...@gmail.com>
Subject Re: Inner Join Operator Using Managed State
Date Wed, 18 May 2016 08:34:23 GMT
Hi Chaitanya,

I am NOT suggesting that you use the interface TimeSliceBucketedState.

I don't see the need of having a JoinStore abstraction.

There will be SpillableArrayListMultimap implementation on which you can
set "ManagedTimeUnifiedStateImpl" as the persistent store.
This API of SpillableArrayListMultimap is sufficient for the use case.

You can directly use this implementation of SpillableArrayListMultimap in
the Join operator.  Here is a simple example:

class InnerJoinOperator
{
   SpillableArrayListMultiMap stream1Data = new
SpillableArrayListMultiMap(ManagedTimeUnifiedStateImp);

   port1.process (tuple) {
       stream1Data.put(tuple.getKey(), tuple.getVal());
   }
}


Chandni




On Wed, May 18, 2016 at 1:00 AM, Chaitanya Chebolu <
chaitanya@datatorrent.com> wrote:

> Hi Chandni,
>
>    I think you are suggesting about interface "TimeSlicedBucketedState". I
> feel this is tightly coupled with the Managed State.
>    In "TimeSlicedBucketedState" abstraction, bucketId parameter relates to
> the Managed State and this is not needed for join operator.
>
> Regards,
> Chaitanya
>
> On Wed, May 18, 2016 at 12:51 PM, Chandni Singh <singh.chandni@gmail.com>
> wrote:
>
> > Chaitanya,
> >
> > SpillableArrayListMultimap will provide gives you similar abstraction.
> >
> > Why do we need  another abstraction "Join Store" ?
> >
> > Chandni
> >
> > On Tue, May 17, 2016 at 11:30 PM, Chaitanya Chebolu <
> > chaitanya@datatorrent.com> wrote:
> >
> > > Chandni,
> > >
> > >    JoinStore is an interface and consists of following methods:
> > > 1) boolean put(Object key, long time, Object value) => Insert (Key,
> > Value)
> > > pair into the store.
> > > 2) List<> getTuples(Object key) => Return the list of values from
store
> > to
> > > which the specified key is mapped.
> > >
> > >    JoinStore is a plug-able to the join operator. Below properties
> > exposed
> > > from join operator:
> > > JoinStore leftStore, rightStore.
> > >
> > > By default, leftStore & rightStore would be the Join Store using
> > spillable
> > > data-structures over ManagedState. If the user wants to integrate
> > different
> > > store, he/she has to implements the JoinStore and set it to the Join
> > > operator.
> > >
> > >
> > > Tim,
> > >
> > >    I am not planning any different implementation.
> > >    Here, I proposed to plug the
> SpillableArrayListMultiMap/SplillableMap
> > > over managed state. I think this is what you are developing. Please
> > correct
> > > it, if I am wrong.
> > >
> > > Regards,
> > > Chaitanya
> > >
> > > On Tue, May 17, 2016 at 11:24 PM, Timothy Farkas <
> > > timothytiborfarkas@gmail.com> wrote:
> > >
> > > > Chaitanya,
> > > >
> > > > Are you planning to use the SpillableMap and
> SpillableArrayListMultiMap
> > > > that are in development, or separate implementations? If you are
> > planning
> > > > on using separate implementations can you please explain why they are
> > > > needed?
> > > >
> > > > Thanks,
> > > > Tim
> > > >
> > > > On Tue, May 17, 2016 at 10:36 AM, Chaitanya Chebolu <
> > > > chaitanya@datatorrent.com> wrote:
> > > >
> > > > > Hi Chandni,
> > > > >
> > > > >   I am suggesting two stores and categorized those based on key
> type:
> > > > > (1) If the key is Primary Key then store would be ManagedJoinStore
> or
> > > > Join
> > > > > store using SpillableMap.
> > > > > (2) If the key is not the Primary Key then store would be Join
> store
> > > > using
> > > > > SpillableArrayListMultiMap.
> > > > >
> > > > > Regards,
> > > > > Chaitanya
> > > > >
> > > > > On Tue, May 17, 2016 at 10:46 PM, Chandni Singh <
> > > singh.chandni@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Chaitanya,
> > > > > >
> > > > > > In the last discussion, we decided that for Join operation we
> > needed
> > > > the
> > > > > > Spillable DataStructures- specifically SpillableArrayListMultiMap
> > for
> > > > the
> > > > > > Join Operator.
> > > > > >
> > > > > > Tim had created a ticket APEXMALHAR-2070 to address this.  The
> pull
> > > > > request
> > > > > > for an in-memory implementation already exists:
> > > > > > https://github.com/apache/incubator-apex-malhar/pull/262
> > > > > >
> > > > > > As commented on the ticket, Tim is working on the implementation
> of
> > > > > > SpillableArrayListMultiMap which uses ManagedState.
> > > > > >
> > > > > > The ManagedJoinStore seems to me like a duplicate of these
> > spoilable
> > > > data
> > > > > > structure. I think we should avoid creating multiple things
that
> > > > provide
> > > > > > the same basic functionality.
> > > > > >
> > > > > > Can you please help review the spillable data structures pull
> > request
> > > > and
> > > > > > point out what you will need for Join that is missing there?
> > > > > >
> > > > > > Thanks,
> > > > > > Chandni
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, May 16, 2016 at 10:05 PM, Chaitanya Chebolu <
> > > > > > chaitanya@datatorrent.com> wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > >   Please go through this design and share your suggestions.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Chaitanya
> > > > > > >
> > > > > > > On Thu, May 12, 2016 at 2:56 PM, Mohit Jotwani <
> > > > mohit@datatorrent.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > +1.
> > > > > > > >
> > > > > > > > This is one of the common use cases in batch scenarios
and it
> > > will
> > > > be
> > > > > > > great
> > > > > > > > to have this in a stream as well using managed state,
> > > > > spillableMultiMap
> > > > > > > > structure.
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Mohit
> > > > > > > >
> > > > > > > > On Thu, May 12, 2016 at 2:38 PM, Yogi Devendra <
> > > > > > > > devendra.vyavahare@gmail.com
> > > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > +1 for the Proposal.
> > > > > > > > >
> > > > > > > > > This will be useful for joins involving large
amount of
> data
> > to
> > > > be
> > > > > > held
> > > > > > > > > intermediately.
> > > > > > > > >
> > > > > > > > > ~ Yogi
> > > > > > > > >
> > > > > > > > > On 9 May 2016 at 14:05, Chaitanya Chebolu <
> > > > > chaitanya@datatorrent.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi ,
> > > > > > > > > >
> > > > > > > > > >    Malhar library has in-memory join operator
and this
> > can't
> > > > > > process
> > > > > > > > > large
> > > > > > > > > > amount of data because of checkpoint and
Memory
> > bottlenecks.
> > > To
> > > > > > avoid
> > > > > > > > > > these, I am proposing inner join operator
using Managed
> > State
> > > > > that
> > > > > > is
> > > > > > > > > > recently added to Malhar.
> > > > > > > > > >
> > > > > > > > > > Details of Inner Join operator, Managed
State and Design
> > for
> > > > > Inner
> > > > > > > Join
> > > > > > > > > > operator using Managed State are given below:
> > > > > > > > > >
> > > > > > > > > > Inner Join Operator
> > > > > > > > > > --------------------------
> > > > > > > > > > Join Operator pairs tuples from two relational
streams
> that
> > > > > matches
> > > > > > > the
> > > > > > > > > > join condition and emits the paired tuples.
> > > > > > > > > >
> > > > > > > > > > For example, let's say S1 and S2 are two
input streams
> and
> > > join
> > > > > > > > condition
> > > > > > > > > > is
> > > > > > > > > > (S1.a.key = S2.b.key) and |S1.a.time - S2.b.time|
< t1
> > > > > > > > > > where a and b are tuples coming on S1 and
S2
> respectively,
> > t1
> > > > is
> > > > > > the
> > > > > > > > time
> > > > > > > > > > period
> > > > > > > > > >
> > > > > > > > > > Based on the above join condition, join
operator needs to
> > > store
> > > > > > > tuples
> > > > > > > > > for
> > > > > > > > > > "t1" time. Let's say JS1 and JS2 are their
respective
> > stores
> > > > > where
> > > > > > > > these
> > > > > > > > > > tuples are stored.
> > > > > > > > > >
> > > > > > > > > > Let (k1, v1) be the tuple coming on Stream
S1.  Following
> > are
> > > > the
> > > > > > > steps
> > > > > > > > > to
> > > > > > > > > > be done for join operation:
> > > > > > > > > >
> > > > > > > > > >    1. Insert (k1, v1) into the store JS1.
> > > > > > > > > >    2. For Key k1, get the values from store
JS2.
> > > > > > > > > >    3. If step 2 returns non-null set, apply
join
> conditions
> > > > > > > > > >
> > > > > > > > > > The similar procedure is applied if the
tuple is on
> Stream
> > > S2,
> > > > > > > swapping
> > > > > > > > > JS1
> > > > > > > > > > and JS2 in the steps above.
> > > > > > > > > >
> > > > > > > > > > Managed State
> > > > > > > > > > --------------------
> > > > > > > > > > Managed State is an incremental check-pointing
and
> > > > fault-tolerant
> > > > > > key
> > > > > > > > > value
> > > > > > > > > > data structure. For more info about Managed
State, please
> > > have
> > > > a
> > > > > > look
> > > > > > > > at
> > > > > > > > > > the below link:
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/managed
> > > > > > > > > >
> > > > > > > > > > Abstract Join Operator
> > > > > > > > > > --------------------------------
> > > > > > > > > > Abstract implementation of Join operator
is available at
> > > > > > > > > > com.datatorrent.lib.join package. JoinStore
is a store
> > > > interface
> > > > > > used
> > > > > > > > in
> > > > > > > > > > join operator and available at the same
package.
> > > > > > > > > > For more details about the join operator,
 please have a
> > look
> > > > at
> > > > > > the
> > > > > > > > > below
> > > > > > > > > > link:
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/join
> > > > > > > > > >
> > > > > > > > > > Design of Join Operator Using Managed State
> > > > > > > > > >
> > -------------------------------------------------------------
> > > > > > > > > > We need to provide a concrete store using
Managed State
> and
> > > > would
> > > > > > be
> > > > > > > > like
> > > > > > > > > > as below:
> > > > > > > > > >
> > > > > > > > > > public class ManagedJoinStore extends
> ManagedTimeStateImpl
> > > > > > implements
> > > > > > > > > > JoinStore
> > > > > > > > > > {
> > > > > > > > > > }
> > > > > > > > > >
> > > > > > > > > > Data in Managed store would be in the form
of Map of
> byte[]
> > > to
> > > > > > > byte[].
> > > > > > > > > >
> > > > > > > > > > If the value is list, then inserting (k1,
v1) into the
> > store
> > > > > would
> > > > > > be
> > > > > > > > the
> > > > > > > > > > following steps:
> > > > > > > > > > (1) For the key k1, get the value from store
JS2. (Here
> we
> > > need
> > > > > to
> > > > > > > > search
> > > > > > > > > > it in all the time buckets, if the key is
not present in
> > > > Memory)
> > > > > > > > > > (2) Convert the above value to List.
> > > > > > > > > > (3) Add the value v1 to the above list.
> > > > > > > > > > (4) Convert the new list to slice.
> > > > > > > > > > (5) Insert (k1, new slice) into the Managed
state.
> > > > > > > > > >
> > > > > > > > > > Values as list has been discussed here:
> > > > > > > > > > https://issues.apache.org/jira/browse/APEXMALHAR-2026
> > > > > > > > > >
> > > > > > > > > > Based on above JIRA, I am suggesting SpillableMultiMap
> data
> > > > > > structure
> > > > > > > > for
> > > > > > > > > > list of values. Let the store would be
> > > > > SpillableMultiMapJoinStore.
> > > > > > > > > >
> > > > > > > > > > Based on above details, store is categorized
based on key
> > > type.
> > > > > > > > > >
> > > > > > > > > >          Key
> >  Store
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> --------------------------------------------------------------------------
> > > > > > > > > > Primary
> ManagedJoinStore
> > > > > > > > > > Not Primary
> > > > > SpillableMultiMapJoinStore
> > > > > > > > > >
> > > > > > > > > > Following additional properties would be
exposed by the
> > Join
> > > > > > > operator:
> > > > > > > > > >
> > > > > > > > > >    - isPrimaryKey - whether the key is primary
or not.
> > > > > > > > > >    - noOfBuckets - Number of key buckets.
This parameter
> is
> > > > > > required
> > > > > > > > for
> > > > > > > > > >    Managed State.
> > > > > > > > > >
> > > > > > > > > > Please provide your suggestions.
> > > > > > > > > >
> > > > > > > > > > Regards,
> > > > > > > > > > Chaitanya
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message