Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0070A187A6 for ; Thu, 1 Oct 2015 07:21:29 +0000 (UTC) Received: (qmail 77838 invoked by uid 500); 1 Oct 2015 07:21:24 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 77737 invoked by uid 500); 1 Oct 2015 07:21:24 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 77725 invoked by uid 99); 1 Oct 2015 07:21:24 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Oct 2015 07:21:24 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id C14BFC01AB for ; Thu, 1 Oct 2015 07:21:23 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.15 X-Spam-Level: *** X-Spam-Status: No, score=3.15 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=3, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id QnjXeQneZZDE for ; Thu, 1 Oct 2015 07:21:15 +0000 (UTC) Received: from mail-wi0-f169.google.com (mail-wi0-f169.google.com [209.85.212.169]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id C63C220591 for ; Thu, 1 Oct 2015 07:21:14 +0000 (UTC) Received: by wicfx3 with SMTP id fx3so14692510wic.0 for ; Thu, 01 Oct 2015 00:21:14 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :cc:content-type; bh=EG7pn80tFp+LHeYNYF8bfIxxKUWaTUuCznZpQ2qYB7g=; b=gKgnhjWEm275K5xEVXZQO1XRVOUViXDhjkR1Ag4DRUt36cNRIZ45k2bWbMBkhT/MTV 6TBzS/UYIh7q8YKgD5jH23hoqNH1cARuHTjIv1IURRUqxwLKqQHBiD7UTtWm3UuwW9Kz xplH0MfAjQtqiPdzGqCmInx4R4VBlVTsD9YLdLUZ/KZGijn+uSI33pU0YwGy/wDNdKkH CAJdtcU7dj/33L0KcmM89qTe9TEoXUPA57gTAb4lArpvNRCwbeneVazigqt/DJKGjsW6 /hzKV6c8pcyuuTY69cf9pKA424Xgy0cmP8Jok5q54h/NFkhIXxo2GuAIKAdN640wF1hK j+Hg== MIME-Version: 1.0 X-Received: by 10.180.75.176 with SMTP id d16mr1700209wiw.75.1443684074451; Thu, 01 Oct 2015 00:21:14 -0700 (PDT) Received: by 10.27.30.137 with HTTP; Thu, 1 Oct 2015 00:21:14 -0700 (PDT) In-Reply-To: References: Date: Thu, 1 Oct 2015 12:51:14 +0530 Message-ID: Subject: Re: [cache eviction] partition recomputation in big lineage RDDs From: Hemant Bhanawat To: Nicolae Marasoiu Cc: "user@spark.apache.org" Content-Type: multipart/alternative; boundary=f46d043c81246cf9a9052105e45a --f46d043c81246cf9a9052105e45a Content-Type: text/plain; charset=UTF-8 As I understand, you don't need merge of your historical data RDD with your RDD_inc, what you need is merge of the computation results of the your historical RDD with RDD_inc and so on. IMO, you should consider having an external row store to hold your computations. I say this because you need to update the rows of prior computation based on the new data. Spark cached batches are column oriented and any update to a spark cached batch is a costly op. On Wed, Sep 30, 2015 at 10:59 PM, Nicolae Marasoiu < nicolae.marasoiu@adswizz.com> wrote: > Hi, > > > An equivalent question would be: can the memory cache be selectively > evicted from within a component run in the driver? I know it is breaking > some abstraction/encapsulation, but clearly I need to evict part of the > cache so that it is reloaded with newer values from DB. > > > Because what I basically need is invalidating some portions of the data > which have newer values. The "compute" method should be the same (read with > TableInputFormat). > > Thanks > Nicu > ------------------------------ > *From:* Nicolae Marasoiu > *Sent:* Wednesday, September 30, 2015 4:07 PM > *To:* user@spark.apache.org > *Subject:* Re: partition recomputation in big lineage RDDs > > > Hi, > > In fact, my RDD will get a new version (a new RDD assigned to the same > var) quite frequently, by merging bulks of 1000 events of events of last > 10s. > > But recomputation would be more efficient to do not by reading initial RDD > partition(s) and reapplying deltas, but by reading from HBase the latest > data, and just compute on top of that if anything. > > Basically I guess I need to write my own RDD and implement compute method > by sliding on hbase. > > Thanks, > Nicu > ------------------------------ > *From:* Nicolae Marasoiu > *Sent:* Wednesday, September 30, 2015 3:05 PM > *To:* user@spark.apache.org > *Subject:* partition recomputation in big lineage RDDs > > > Hi, > > > If I implement a manner to have an up-to-date version of my RDD by > ingesting some new events, called RDD_inc (from increment), and I provide a > "merge" function m(RDD, RDD_inc), which returns the RDD_new, it looks like > I can evolve the state of my RDD by constructing new RDDs all the time, and > doing it in a manner that hopes to reuse as much data from the past RDD and > make the rest garbage collectable. An example merge function would be a > join on some ids, and creating a merged state for each element. The type of > the result of m(RDD, RDD_inc) is the same type as that of RDD. > > > My question on this is how does the recomputation work for such an RDD, > which is not the direct result of hdfs load, but is the result of a long > lineage of such functions/transformations: > > > Lets say my RDD is now after 2 merge iterations like this: > > RDD_new = merge(merge(RDD, RDD_inc1), RDD_inc2) > > > When recomputing a part of RDD_new here are my assumptions: > > - only full partitions are recomputed, nothing more granular? > > - the corresponding partitions of RDD, RDD_inc1 and RDD_inc2 are recomputed > > - the function are applied > > > And this seems more simplistic, since the partitions do not fully align in > the general case between all these RDDs. The other aspect is the > potentially redundant load of data which is in fact not required anymore > (the data ruled out in the merge). > > > A more detailed version of this question is at > https://www.quora.com/How-does-Spark-RDD-recomputation-avoids-duplicate-loading-or-computation/ > > > Thanks, > > Nicu > --f46d043c81246cf9a9052105e45a Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
As I understand, you don't need merge of =C2=A0yo= ur historical data RDD with your RDD_inc, what you need is merge of the com= putation results of the your historical RDD with RDD_inc and so on.=C2=A0

IMO, you should consider having an external row sto= re to hold your computations. I say this because you need to update the row= s of prior computation based on the new data. Spark cached batches are colu= mn oriented and any update to a spark cached batch is a costly op.=C2=A0


On Wed, Sep 30, 2015 at 10:59 PM, Nicolae Marasoiu <nico= lae.marasoiu@adswizz.com> wrote:

Hi,


An equivalent question would be: can the memory cache be selectively evi= cted from within a component run in the driver? I know it is breaking some = abstraction/encapsulation, but clearly I need to evict part of the cache so= that it is reloaded with newer values from DB.


Because what I basically need is invalidating some portions of the data = which have newer values. The "compute" method should be the same = (read with TableInputFormat).


Thanks
Nicu

From: Nicolae Marasoiu <nicolae.marasoiu@adswizz.com>
Sent: Wednesday, September 30, 2015 4:07 PM
To:
user@= spark.apache.org
Subject: Re: partition recomputation in big lineage RDDs
=C2=A0

Hi,


In fact, my RDD will get a new version (a new RDD assigned to the same var)= quite frequently, by merging bulks of 1000 events of events of last 10s.

But recomputation would be more efficient to do not by reading initial= RDD partition(s) and reapplying deltas, but by reading from HBase the late= st data, and just compute on top of that if anything.

Basically I guess I need to write my own RDD and implement compute met= hod by sliding on hbase.

Thanks,
Nicu

From: Nicolae Marasoiu <nicolae.marasoiu@adswizz.com>
Sent: Wednesday, September 30, 2015 3:05 PM
To:
user@= spark.apache.org
Subject: partition recomputation in big lineage RDDs
=C2=A0

Hi,


If I implement a manner to have an up-to-date version of my RDD by inges= ting some new events, called RDD_inc (from increment), and I provide a &quo= t;merge" function m(RDD, RDD_inc), which returns the RDD_new, it looks= like I can evolve the state of my RDD by constructing new RDDs all the time, and doing it in a manner that hopes to= reuse as much data from the past RDD and make the rest garbage collectable= . An example merge function would be a join on some ids, and creating a mer= ged state for each element. The type of the result of m(RDD, RDD_inc) is the same type as that of RDD.


My question on this is how does the recomputation work for such an RDD, = which is not the direct result of hdfs load, but is the result of a long li= neage of such functions/transformations:


Lets say my RDD is now after 2 merge iterations like this:

RDD_new =3D merge(merge(RDD, RDD_inc1), RDD_inc2)


When recomputing a part of RDD_new here are my assumptions:

- only full partitions are recomputed, nothing more granular?

- the corresponding partitions of RDD, RDD_inc1 and RDD_inc2 are recompu= ted

- the function are applied


And this seems more simplistic, since the partitions do not fully align = in the general case between all these RDDs. The other aspect is the potenti= ally redundant load of data which is in fact not required anymore (the data= ruled out in the merge).


A more detailed version of this question is at=C2=A0https://www.quora.co= m/How-does-Spark-RDD-recomputation-avoids-duplicate-loading-or-computation/=


Thanks,

Nicu


--f46d043c81246cf9a9052105e45a--