apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chandni Singh <singh.chan...@gmail.com>
Subject Re: Inner Join Operator Using Managed State
Date Fri, 20 May 2016 05:29:16 GMT
Hey Tim,

As I understand for Join operator, there needs to be a common abstract for
a SpillableMap and SpillableArrayListMultipmap.

I suggested using SpillableComplexComponent. Is this correct?

Thanks,
Chandni

On Wed, May 18, 2016 at 1:34 AM, Chandni Singh <singh.chandni@gmail.com>
wrote:

> Hi Chaitanya,
>
> I am NOT suggesting that you use the interface TimeSliceBucketedState.
>
> I don't see the need of having a JoinStore abstraction.
>
> There will be SpillableArrayListMultimap implementation on which you can
> set "ManagedTimeUnifiedStateImpl" as the persistent store.
> This API of SpillableArrayListMultimap is sufficient for the use case.
>
> You can directly use this implementation of SpillableArrayListMultimap in
> the Join operator.  Here is a simple example:
>
> class InnerJoinOperator
> {
>    SpillableArrayListMultiMap stream1Data = new
> SpillableArrayListMultiMap(ManagedTimeUnifiedStateImp);
>
>    port1.process (tuple) {
>        stream1Data.put(tuple.getKey(), tuple.getVal());
>    }
> }
>
>
> Chandni
>
>
>
>
> On Wed, May 18, 2016 at 1:00 AM, Chaitanya Chebolu <
> chaitanya@datatorrent.com> wrote:
>
>> Hi Chandni,
>>
>>    I think you are suggesting about interface "TimeSlicedBucketedState". I
>> feel this is tightly coupled with the Managed State.
>>    In "TimeSlicedBucketedState" abstraction, bucketId parameter relates to
>> the Managed State and this is not needed for join operator.
>>
>> Regards,
>> Chaitanya
>>
>> On Wed, May 18, 2016 at 12:51 PM, Chandni Singh <singh.chandni@gmail.com>
>> wrote:
>>
>> > Chaitanya,
>> >
>> > SpillableArrayListMultimap will provide gives you similar abstraction.
>> >
>> > Why do we need  another abstraction "Join Store" ?
>> >
>> > Chandni
>> >
>> > On Tue, May 17, 2016 at 11:30 PM, Chaitanya Chebolu <
>> > chaitanya@datatorrent.com> wrote:
>> >
>> > > Chandni,
>> > >
>> > >    JoinStore is an interface and consists of following methods:
>> > > 1) boolean put(Object key, long time, Object value) => Insert (Key,
>> > Value)
>> > > pair into the store.
>> > > 2) List<> getTuples(Object key) => Return the list of values from
>> store
>> > to
>> > > which the specified key is mapped.
>> > >
>> > >    JoinStore is a plug-able to the join operator. Below properties
>> > exposed
>> > > from join operator:
>> > > JoinStore leftStore, rightStore.
>> > >
>> > > By default, leftStore & rightStore would be the Join Store using
>> > spillable
>> > > data-structures over ManagedState. If the user wants to integrate
>> > different
>> > > store, he/she has to implements the JoinStore and set it to the Join
>> > > operator.
>> > >
>> > >
>> > > Tim,
>> > >
>> > >    I am not planning any different implementation.
>> > >    Here, I proposed to plug the
>> SpillableArrayListMultiMap/SplillableMap
>> > > over managed state. I think this is what you are developing. Please
>> > correct
>> > > it, if I am wrong.
>> > >
>> > > Regards,
>> > > Chaitanya
>> > >
>> > > On Tue, May 17, 2016 at 11:24 PM, Timothy Farkas <
>> > > timothytiborfarkas@gmail.com> wrote:
>> > >
>> > > > Chaitanya,
>> > > >
>> > > > Are you planning to use the SpillableMap and
>> SpillableArrayListMultiMap
>> > > > that are in development, or separate implementations? If you are
>> > planning
>> > > > on using separate implementations can you please explain why they
>> are
>> > > > needed?
>> > > >
>> > > > Thanks,
>> > > > Tim
>> > > >
>> > > > On Tue, May 17, 2016 at 10:36 AM, Chaitanya Chebolu <
>> > > > chaitanya@datatorrent.com> wrote:
>> > > >
>> > > > > Hi Chandni,
>> > > > >
>> > > > >   I am suggesting two stores and categorized those based on key
>> type:
>> > > > > (1) If the key is Primary Key then store would be
>> ManagedJoinStore or
>> > > > Join
>> > > > > store using SpillableMap.
>> > > > > (2) If the key is not the Primary Key then store would be Join
>> store
>> > > > using
>> > > > > SpillableArrayListMultiMap.
>> > > > >
>> > > > > Regards,
>> > > > > Chaitanya
>> > > > >
>> > > > > On Tue, May 17, 2016 at 10:46 PM, Chandni Singh <
>> > > singh.chandni@gmail.com
>> > > > >
>> > > > > wrote:
>> > > > >
>> > > > > > Hi Chaitanya,
>> > > > > >
>> > > > > > In the last discussion, we decided that for Join operation
we
>> > needed
>> > > > the
>> > > > > > Spillable DataStructures- specifically
>> SpillableArrayListMultiMap
>> > for
>> > > > the
>> > > > > > Join Operator.
>> > > > > >
>> > > > > > Tim had created a ticket APEXMALHAR-2070 to address this.
 The
>> pull
>> > > > > request
>> > > > > > for an in-memory implementation already exists:
>> > > > > > https://github.com/apache/incubator-apex-malhar/pull/262
>> > > > > >
>> > > > > > As commented on the ticket, Tim is working on the
>> implementation of
>> > > > > > SpillableArrayListMultiMap which uses ManagedState.
>> > > > > >
>> > > > > > The ManagedJoinStore seems to me like a duplicate of these
>> > spoilable
>> > > > data
>> > > > > > structure. I think we should avoid creating multiple things
that
>> > > > provide
>> > > > > > the same basic functionality.
>> > > > > >
>> > > > > > Can you please help review the spillable data structures
pull
>> > request
>> > > > and
>> > > > > > point out what you will need for Join that is missing there?
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Chandni
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Mon, May 16, 2016 at 10:05 PM, Chaitanya Chebolu <
>> > > > > > chaitanya@datatorrent.com> wrote:
>> > > > > >
>> > > > > > > Hi All,
>> > > > > > >
>> > > > > > >   Please go through this design and share your suggestions.
>> > > > > > >
>> > > > > > > Regards,
>> > > > > > > Chaitanya
>> > > > > > >
>> > > > > > > On Thu, May 12, 2016 at 2:56 PM, Mohit Jotwani <
>> > > > mohit@datatorrent.com>
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > +1.
>> > > > > > > >
>> > > > > > > > This is one of the common use cases in batch scenarios
and
>> it
>> > > will
>> > > > be
>> > > > > > > great
>> > > > > > > > to have this in a stream as well using managed
state,
>> > > > > spillableMultiMap
>> > > > > > > > structure.
>> > > > > > > >
>> > > > > > > > Regards,
>> > > > > > > > Mohit
>> > > > > > > >
>> > > > > > > > On Thu, May 12, 2016 at 2:38 PM, Yogi Devendra
<
>> > > > > > > > devendra.vyavahare@gmail.com
>> > > > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > +1 for the Proposal.
>> > > > > > > > >
>> > > > > > > > > This will be useful for joins involving large
amount of
>> data
>> > to
>> > > > be
>> > > > > > held
>> > > > > > > > > intermediately.
>> > > > > > > > >
>> > > > > > > > > ~ Yogi
>> > > > > > > > >
>> > > > > > > > > On 9 May 2016 at 14:05, Chaitanya Chebolu
<
>> > > > > chaitanya@datatorrent.com
>> > > > > > >
>> > > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi ,
>> > > > > > > > > >
>> > > > > > > > > >    Malhar library has in-memory join
operator and this
>> > can't
>> > > > > > process
>> > > > > > > > > large
>> > > > > > > > > > amount of data because of checkpoint
and Memory
>> > bottlenecks.
>> > > To
>> > > > > > avoid
>> > > > > > > > > > these, I am proposing inner join operator
using Managed
>> > State
>> > > > > that
>> > > > > > is
>> > > > > > > > > > recently added to Malhar.
>> > > > > > > > > >
>> > > > > > > > > > Details of Inner Join operator, Managed
State and Design
>> > for
>> > > > > Inner
>> > > > > > > Join
>> > > > > > > > > > operator using Managed State are given
below:
>> > > > > > > > > >
>> > > > > > > > > > Inner Join Operator
>> > > > > > > > > > --------------------------
>> > > > > > > > > > Join Operator pairs tuples from two
relational streams
>> that
>> > > > > matches
>> > > > > > > the
>> > > > > > > > > > join condition and emits the paired
tuples.
>> > > > > > > > > >
>> > > > > > > > > > For example, let's say S1 and S2 are
two input streams
>> and
>> > > join
>> > > > > > > > condition
>> > > > > > > > > > is
>> > > > > > > > > > (S1.a.key = S2.b.key) and |S1.a.time
- S2.b.time| < t1
>> > > > > > > > > > where a and b are tuples coming on S1
and S2
>> respectively,
>> > t1
>> > > > is
>> > > > > > the
>> > > > > > > > time
>> > > > > > > > > > period
>> > > > > > > > > >
>> > > > > > > > > > Based on the above join condition, join
operator needs
>> to
>> > > store
>> > > > > > > tuples
>> > > > > > > > > for
>> > > > > > > > > > "t1" time. Let's say JS1 and JS2 are
their respective
>> > stores
>> > > > > where
>> > > > > > > > these
>> > > > > > > > > > tuples are stored.
>> > > > > > > > > >
>> > > > > > > > > > Let (k1, v1) be the tuple coming on
Stream S1.
>> Following
>> > are
>> > > > the
>> > > > > > > steps
>> > > > > > > > > to
>> > > > > > > > > > be done for join operation:
>> > > > > > > > > >
>> > > > > > > > > >    1. Insert (k1, v1) into the store
JS1.
>> > > > > > > > > >    2. For Key k1, get the values from
store JS2.
>> > > > > > > > > >    3. If step 2 returns non-null set,
apply join
>> conditions
>> > > > > > > > > >
>> > > > > > > > > > The similar procedure is applied if
the tuple is on
>> Stream
>> > > S2,
>> > > > > > > swapping
>> > > > > > > > > JS1
>> > > > > > > > > > and JS2 in the steps above.
>> > > > > > > > > >
>> > > > > > > > > > Managed State
>> > > > > > > > > > --------------------
>> > > > > > > > > > Managed State is an incremental check-pointing
and
>> > > > fault-tolerant
>> > > > > > key
>> > > > > > > > > value
>> > > > > > > > > > data structure. For more info about
Managed State,
>> please
>> > > have
>> > > > a
>> > > > > > look
>> > > > > > > > at
>> > > > > > > > > > the below link:
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/managed
>> > > > > > > > > >
>> > > > > > > > > > Abstract Join Operator
>> > > > > > > > > > --------------------------------
>> > > > > > > > > > Abstract implementation of Join operator
is available at
>> > > > > > > > > > com.datatorrent.lib.join package. JoinStore
is a store
>> > > > interface
>> > > > > > used
>> > > > > > > > in
>> > > > > > > > > > join operator and available at the same
package.
>> > > > > > > > > > For more details about the join operator,
 please have a
>> > look
>> > > > at
>> > > > > > the
>> > > > > > > > > below
>> > > > > > > > > > link:
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/join
>> > > > > > > > > >
>> > > > > > > > > > Design of Join Operator Using Managed
State
>> > > > > > > > > >
>> > -------------------------------------------------------------
>> > > > > > > > > > We need to provide a concrete store
using Managed State
>> and
>> > > > would
>> > > > > > be
>> > > > > > > > like
>> > > > > > > > > > as below:
>> > > > > > > > > >
>> > > > > > > > > > public class ManagedJoinStore extends
>> ManagedTimeStateImpl
>> > > > > > implements
>> > > > > > > > > > JoinStore
>> > > > > > > > > > {
>> > > > > > > > > > }
>> > > > > > > > > >
>> > > > > > > > > > Data in Managed store would be in the
form of Map of
>> byte[]
>> > > to
>> > > > > > > byte[].
>> > > > > > > > > >
>> > > > > > > > > > If the value is list, then inserting
(k1, v1) into the
>> > store
>> > > > > would
>> > > > > > be
>> > > > > > > > the
>> > > > > > > > > > following steps:
>> > > > > > > > > > (1) For the key k1, get the value from
store JS2. (Here
>> we
>> > > need
>> > > > > to
>> > > > > > > > search
>> > > > > > > > > > it in all the time buckets, if the key
is not present in
>> > > > Memory)
>> > > > > > > > > > (2) Convert the above value to List.
>> > > > > > > > > > (3) Add the value v1 to the above list.
>> > > > > > > > > > (4) Convert the new list to slice.
>> > > > > > > > > > (5) Insert (k1, new slice) into the
Managed state.
>> > > > > > > > > >
>> > > > > > > > > > Values as list has been discussed here:
>> > > > > > > > > > https://issues.apache.org/jira/browse/APEXMALHAR-2026
>> > > > > > > > > >
>> > > > > > > > > > Based on above JIRA, I am suggesting
SpillableMultiMap
>> data
>> > > > > > structure
>> > > > > > > > for
>> > > > > > > > > > list of values. Let the store would
be
>> > > > > SpillableMultiMapJoinStore.
>> > > > > > > > > >
>> > > > > > > > > > Based on above details, store is categorized
based on
>> key
>> > > type.
>> > > > > > > > > >
>> > > > > > > > > >          Key
>> >  Store
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> --------------------------------------------------------------------------
>> > > > > > > > > > Primary
>> ManagedJoinStore
>> > > > > > > > > > Not Primary
>> > > > > SpillableMultiMapJoinStore
>> > > > > > > > > >
>> > > > > > > > > > Following additional properties would
be exposed by the
>> > Join
>> > > > > > > operator:
>> > > > > > > > > >
>> > > > > > > > > >    - isPrimaryKey - whether the key
is primary or not.
>> > > > > > > > > >    - noOfBuckets - Number of key buckets.
This
>> parameter is
>> > > > > > required
>> > > > > > > > for
>> > > > > > > > > >    Managed State.
>> > > > > > > > > >
>> > > > > > > > > > Please provide your suggestions.
>> > > > > > > > > >
>> > > > > > > > > > Regards,
>> > > > > > > > > > Chaitanya
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message