Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EC5932009A8 for ; Tue, 17 May 2016 19:54:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EAC621609F5; Tue, 17 May 2016 17:54:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 19D8F1607A8 for ; Tue, 17 May 2016 19:54:33 +0200 (CEST) Received: (qmail 63802 invoked by uid 500); 17 May 2016 17:54:33 -0000 Mailing-List: contact dev-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list dev@apex.incubator.apache.org Received: (qmail 63789 invoked by uid 99); 17 May 2016 17:54:33 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 May 2016 17:54:33 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 8F15FC0DD8 for ; Tue, 17 May 2016 17:54:32 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.179 X-Spam-Level: ** X-Spam-Status: No, score=2.179 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_REPLY=1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id xjjCieUDmgU2 for ; Tue, 17 May 2016 17:54:30 +0000 (UTC) Received: from mail-wm0-f49.google.com (mail-wm0-f49.google.com [74.125.82.49]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id BEBC45F24C for ; Tue, 17 May 2016 17:54:29 +0000 (UTC) Received: by mail-wm0-f49.google.com with SMTP id a17so44169437wme.0 for ; Tue, 17 May 2016 10:54:29 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to; bh=1j0/6R5sRF8hJ+qsE+2mQW3JK2+eLDnQ2uPZpVd5Pbs=; b=t31EzyEmetSwcfu3D/pzV+6CfMQFOmyL3kitjo5HNjT5Voz0SHrSI5dek7wFnysWGh 7cjqxFYDdaIlAFcuGo3Cz5YImmEsIJZAaBIim8VGbLPTGjWmhnoOpnXr0Jb7iLp+nerW oBAvEvF8nrQUr56hKM4JWs+8MKfHrIIwXZFjXNZILTaZNeJ3L7cZbir3SA57l9N5bXEV +8ui5E6RGPCa9wRmxmmOxP9wlwTxkbDLLRClkZxfrxoLxAm8Gazp1likQYjx3dyAXLVg 9Hca7Fcc/GgJTQGY1TKzBxCKvKU1/ZrdS0NCKH1kDcp7Rv8Nncal2EwKNEf78BSLU28c SgLA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to; bh=1j0/6R5sRF8hJ+qsE+2mQW3JK2+eLDnQ2uPZpVd5Pbs=; b=SLGE3roXut/+9ra4UoThgP+ATXy7o0obTRrfEt1lnRdj2P/k9PvCKMDo/M0bxwPGJs XTHGt0gk2gAm2Ry0PBl/gUEV5on9XN/FxWRFNBxrYAeuZ3e11eD3BULjhhA+f36piR8W JUqWO0FcE5bQczmQl3qToS1Kb2e+YCAaWm/wHi6xBOrYybdc3cHozZjquVaLjrE8uJtB Cgz7B/UIthZTiVe149OA3eG0531bxs+bTdRDca6J0m90pO3OBpJLyCv0YT1puzJN0Uti d6nqa77fyBG4pNggncurbmm973dk7nxymYKe5e5xxRnFPQi8GrVrPCTZN2jMrdjhOuL0 /cuQ== X-Gm-Message-State: AOPr4FXJim5llnjG4JvYyzyb6NUaYehtCrHe7pzfcQAuVhSe1hPB+QtE9zY8hX812XDb21jNWMzr6uaqO4+gzA== MIME-Version: 1.0 X-Received: by 10.28.132.68 with SMTP id g65mr2861721wmd.91.1463507668492; Tue, 17 May 2016 10:54:28 -0700 (PDT) Received: by 10.194.190.115 with HTTP; Tue, 17 May 2016 10:54:28 -0700 (PDT) In-Reply-To: References: Date: Tue, 17 May 2016 10:54:28 -0700 Message-ID: Subject: Re: Inner Join Operator Using Managed State From: Timothy Farkas To: dev@apex.incubator.apache.org Content-Type: multipart/alternative; boundary=001a114430a4b52a9f05330d6e47 archived-at: Tue, 17 May 2016 17:54:35 -0000 --001a114430a4b52a9f05330d6e47 Content-Type: text/plain; charset=UTF-8 Chaitanya, Are you planning to use the SpillableMap and SpillableArrayListMultiMap that are in development, or separate implementations? If you are planning on using separate implementations can you please explain why they are needed? Thanks, Tim On Tue, May 17, 2016 at 10:36 AM, Chaitanya Chebolu < chaitanya@datatorrent.com> wrote: > Hi Chandni, > > I am suggesting two stores and categorized those based on key type: > (1) If the key is Primary Key then store would be ManagedJoinStore or Join > store using SpillableMap. > (2) If the key is not the Primary Key then store would be Join store using > SpillableArrayListMultiMap. > > Regards, > Chaitanya > > On Tue, May 17, 2016 at 10:46 PM, Chandni Singh > wrote: > > > Hi Chaitanya, > > > > In the last discussion, we decided that for Join operation we needed the > > Spillable DataStructures- specifically SpillableArrayListMultiMap for the > > Join Operator. > > > > Tim had created a ticket APEXMALHAR-2070 to address this. The pull > request > > for an in-memory implementation already exists: > > https://github.com/apache/incubator-apex-malhar/pull/262 > > > > As commented on the ticket, Tim is working on the implementation of > > SpillableArrayListMultiMap which uses ManagedState. > > > > The ManagedJoinStore seems to me like a duplicate of these spoilable data > > structure. I think we should avoid creating multiple things that provide > > the same basic functionality. > > > > Can you please help review the spillable data structures pull request and > > point out what you will need for Join that is missing there? > > > > Thanks, > > Chandni > > > > > > > > On Mon, May 16, 2016 at 10:05 PM, Chaitanya Chebolu < > > chaitanya@datatorrent.com> wrote: > > > > > Hi All, > > > > > > Please go through this design and share your suggestions. > > > > > > Regards, > > > Chaitanya > > > > > > On Thu, May 12, 2016 at 2:56 PM, Mohit Jotwani > > > wrote: > > > > > > > +1. > > > > > > > > This is one of the common use cases in batch scenarios and it will be > > > great > > > > to have this in a stream as well using managed state, > spillableMultiMap > > > > structure. > > > > > > > > Regards, > > > > Mohit > > > > > > > > On Thu, May 12, 2016 at 2:38 PM, Yogi Devendra < > > > > devendra.vyavahare@gmail.com > > > > > wrote: > > > > > > > > > +1 for the Proposal. > > > > > > > > > > This will be useful for joins involving large amount of data to be > > held > > > > > intermediately. > > > > > > > > > > ~ Yogi > > > > > > > > > > On 9 May 2016 at 14:05, Chaitanya Chebolu < > chaitanya@datatorrent.com > > > > > > > > wrote: > > > > > > > > > > > Hi , > > > > > > > > > > > > Malhar library has in-memory join operator and this can't > > process > > > > > large > > > > > > amount of data because of checkpoint and Memory bottlenecks. To > > avoid > > > > > > these, I am proposing inner join operator using Managed State > that > > is > > > > > > recently added to Malhar. > > > > > > > > > > > > Details of Inner Join operator, Managed State and Design for > Inner > > > Join > > > > > > operator using Managed State are given below: > > > > > > > > > > > > Inner Join Operator > > > > > > -------------------------- > > > > > > Join Operator pairs tuples from two relational streams that > matches > > > the > > > > > > join condition and emits the paired tuples. > > > > > > > > > > > > For example, let's say S1 and S2 are two input streams and join > > > > condition > > > > > > is > > > > > > (S1.a.key = S2.b.key) and |S1.a.time - S2.b.time| < t1 > > > > > > where a and b are tuples coming on S1 and S2 respectively, t1 is > > the > > > > time > > > > > > period > > > > > > > > > > > > Based on the above join condition, join operator needs to store > > > tuples > > > > > for > > > > > > "t1" time. Let's say JS1 and JS2 are their respective stores > where > > > > these > > > > > > tuples are stored. > > > > > > > > > > > > Let (k1, v1) be the tuple coming on Stream S1. Following are the > > > steps > > > > > to > > > > > > be done for join operation: > > > > > > > > > > > > 1. Insert (k1, v1) into the store JS1. > > > > > > 2. For Key k1, get the values from store JS2. > > > > > > 3. If step 2 returns non-null set, apply join conditions > > > > > > > > > > > > The similar procedure is applied if the tuple is on Stream S2, > > > swapping > > > > > JS1 > > > > > > and JS2 in the steps above. > > > > > > > > > > > > Managed State > > > > > > -------------------- > > > > > > Managed State is an incremental check-pointing and fault-tolerant > > key > > > > > value > > > > > > data structure. For more info about Managed State, please have a > > look > > > > at > > > > > > the below link: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/managed > > > > > > > > > > > > Abstract Join Operator > > > > > > -------------------------------- > > > > > > Abstract implementation of Join operator is available at > > > > > > com.datatorrent.lib.join package. JoinStore is a store interface > > used > > > > in > > > > > > join operator and available at the same package. > > > > > > For more details about the join operator, please have a look at > > the > > > > > below > > > > > > link: > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/join > > > > > > > > > > > > Design of Join Operator Using Managed State > > > > > > ------------------------------------------------------------- > > > > > > We need to provide a concrete store using Managed State and would > > be > > > > like > > > > > > as below: > > > > > > > > > > > > public class ManagedJoinStore extends ManagedTimeStateImpl > > implements > > > > > > JoinStore > > > > > > { > > > > > > } > > > > > > > > > > > > Data in Managed store would be in the form of Map of byte[] to > > > byte[]. > > > > > > > > > > > > If the value is list, then inserting (k1, v1) into the store > would > > be > > > > the > > > > > > following steps: > > > > > > (1) For the key k1, get the value from store JS2. (Here we need > to > > > > search > > > > > > it in all the time buckets, if the key is not present in Memory) > > > > > > (2) Convert the above value to List. > > > > > > (3) Add the value v1 to the above list. > > > > > > (4) Convert the new list to slice. > > > > > > (5) Insert (k1, new slice) into the Managed state. > > > > > > > > > > > > Values as list has been discussed here: > > > > > > https://issues.apache.org/jira/browse/APEXMALHAR-2026 > > > > > > > > > > > > Based on above JIRA, I am suggesting SpillableMultiMap data > > structure > > > > for > > > > > > list of values. Let the store would be > SpillableMultiMapJoinStore. > > > > > > > > > > > > Based on above details, store is categorized based on key type. > > > > > > > > > > > > Key Store > > > > > > > > > > > > > > > > > > > > > -------------------------------------------------------------------------- > > > > > > Primary ManagedJoinStore > > > > > > Not Primary > SpillableMultiMapJoinStore > > > > > > > > > > > > Following additional properties would be exposed by the Join > > > operator: > > > > > > > > > > > > - isPrimaryKey - whether the key is primary or not. > > > > > > - noOfBuckets - Number of key buckets. This parameter is > > required > > > > for > > > > > > Managed State. > > > > > > > > > > > > Please provide your suggestions. > > > > > > > > > > > > Regards, > > > > > > Chaitanya > > > > > > > > > > > > > > > > > > > > > --001a114430a4b52a9f05330d6e47--