Return-Path: X-Original-To: apmail-spark-dev-archive@minotaur.apache.org Delivered-To: apmail-spark-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 796731888F for ; Thu, 2 Jul 2015 17:28:44 +0000 (UTC) Received: (qmail 67884 invoked by uid 500); 2 Jul 2015 17:28:39 -0000 Delivered-To: apmail-spark-dev-archive@spark.apache.org Received: (qmail 67770 invoked by uid 500); 2 Jul 2015 17:28:39 -0000 Mailing-List: contact dev-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list dev@spark.apache.org Received: (qmail 66944 invoked by uid 99); 2 Jul 2015 17:28:38 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jul 2015 17:28:38 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 79FDD1817A8; Thu, 2 Jul 2015 17:28:38 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.9 X-Spam-Level: ** X-Spam-Status: No, score=2.9 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 9v2GOzlhNV7j; Thu, 2 Jul 2015 17:28:28 +0000 (UTC) Received: from mail-qk0-f177.google.com (mail-qk0-f177.google.com [209.85.220.177]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id AE9BC45417; Thu, 2 Jul 2015 17:28:28 +0000 (UTC) Received: by qkei195 with SMTP id i195so56752383qke.3; Thu, 02 Jul 2015 10:27:43 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :cc:content-type; bh=whqLPwS3zsvMUo40lKq9YmQEFPtlsZyDuW3A7vcHkFo=; b=GnduyQV/V9Slx7LttS70h/QHZutiWgnzs3dAzcLVKu0E+V8ZoxOlmxzPrQri+McGay tzgtWx8KUqVqP2BEhY1H2SnSgVID/TRfluufwgFh0mvI0G5dey0IByVGJEsI9zXSekBf jWS/SIy1XmGfQ+B1AQ8slSTzGBYMATCGyaMt2mpOhEToMS4iPNiBKmPg5V08K+iXWW8o Ify7Brf/zF0d5gr3EFN84kA1gS03zX9h7jtzM+DkHvwEK/eUme+hOBDRMewHUxRaj80a j/tdMzZ/It6nndbOwjFfITSNeOohrr+29ppg3jC0BpFBeqxu/Le8CRotMUPaKykV+ZYl Xk1g== MIME-Version: 1.0 X-Received: by 10.55.26.34 with SMTP id a34mr65926945qka.90.1435858063469; Thu, 02 Jul 2015 10:27:43 -0700 (PDT) Received: by 10.96.61.99 with HTTP; Thu, 2 Jul 2015 10:27:43 -0700 (PDT) In-Reply-To: References: Date: Thu, 2 Jul 2015 10:27:43 -0700 Message-ID: Subject: Re: Grouping runs of elements in a RDD From: Mohit Jaggi To: RJ Nowling Cc: "Abhishek R. Singh" , Reynold Xin , "dev@spark.apache.org" , "user@spark.apache.org" Content-Type: multipart/alternative; boundary=001a11472168d234460519e7c1f2 --001a11472168d234460519e7c1f2 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable if you are joining successive lines together based on a predicate, then you are doing a "flatMap" not an "aggregate". you are on the right track with a multi-pass solution. i had the same challenge when i needed a sliding window over an RDD(see below). [ i had suggested that the sliding window API be moved to spark-core. not sure if that happened ] ----- previous posts --- http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.ml= lib.rdd.RDDFunctions > On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi > wrote: > > > http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3CCALRVT= pKN65rOLzbETC+Ddk4O+YJm+TfAF5DZ8EuCpL-2YHYPZA@mail.gmail.com%3E > > you can use the MLLib function or do the following (which is what I had > done): > > - in first pass over the data, using mapPartitionWithIndex, gather the > first item in each partition. you can use collect (or aggregator) for thi= s. > =E2=80=9Ckey=E2=80=9D them by the partition index. at the end, you will h= ave a map > (partition index) --> first item > - in the second pass over the data, using mapPartitionWithIndex again, > look at two (or in the general case N items at a time, you can use scala= =E2=80=99s > sliding iterator) items at a time and check the time difference(or any > sliding window computation). To this mapParitition, pass the map created = in > previous step. You will need to use them to check the last item in this > partition. > > If you can tolerate a few inaccuracies then you can just do the second > step. You will miss the =E2=80=9Cboundaries=E2=80=9D of the partitions bu= t it might be > acceptable for your use case. On Tue, Jun 30, 2015 at 12:21 PM, RJ Nowling wrote: > That's an interesting idea! I hadn't considered that. However, looking > at the Partitioner interface, I would need to know from looking at a sing= le > key which doesn't fit my case, unfortunately. For my case, I need to > compare successive pairs of keys. (I'm trying to re-join lines that were > split prematurely.) > > On Tue, Jun 30, 2015 at 2:07 PM, Abhishek R. Singh < > abhishsi@tetrationanalytics.com> wrote: > >> could you use a custom partitioner to preserve boundaries such that all >> related tuples end up on the same partition? >> >> On Jun 30, 2015, at 12:00 PM, RJ Nowling wrote: >> >> Thanks, Reynold. I still need to handle incomplete groups that fall >> between partition boundaries. So, I need a two-pass approach. I came up >> with a somewhat hacky way to handle those using the partition indices an= d >> key-value pairs as a second pass after the first. >> >> OCaml's std library provides a function called group() that takes a brea= k >> function that operators on pairs of successive elements. It seems a >> similar approach could be used in Spark and would be more efficient than= my >> approach with key-value pairs since you know the ordering of the partiti= ons. >> >> Has this need been expressed by others? >> >> On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin wrote= : >> >>> Try mapPartitions, which gives you an iterator, and you can produce an >>> iterator back. >>> >>> >>> On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling wrote= : >>> >>>> Hi all, >>>> >>>> I have a problem where I have a RDD of elements: >>>> >>>> Item1 Item2 Item3 Item4 Item5 Item6 ... >>>> >>>> and I want to run a function over them to decide which runs of element= s >>>> to group together: >>>> >>>> [Item1 Item2] [Item3] [Item4 Item5 Item6] ... >>>> >>>> Technically, I could use aggregate to do this, but I would have to use >>>> a List of List of T which would produce a very large collection in mem= ory. >>>> >>>> Is there an easy way to accomplish this? e.g.,, it would be nice to >>>> have a version of aggregate where the combination function can return = a >>>> complete group that is added to the new RDD and an incomplete group wh= ich >>>> is passed to the next call of the reduce function. >>>> >>>> Thanks, >>>> RJ >>>> >>> >>> >> >> > --001a11472168d234460519e7c1f2 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
if you are joining successive lines together based on= a predicate, then you are doing a "flatMap" not an "aggrega= te". you are on the right track with a multi-pass solution. i had the = same challenge when i needed a sliding window over an RDD(see below).=C2=A0=

[ i had suggested that the sliding window API be = moved to spark-core. not sure if that happened ]

-= ---- previous posts ---

> On Fri, Jan 30, 2015 at 12:27 AM, Mohit Ja= ggi <mohitjaggi@gmail.com>= ; > wrote: > > > http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3CCALR= VTpKN65rOLzbETC+Ddk4O+YJm+TfAF5DZ8EuCpL-2YHYPZA@mail.gmail.com%3E > > you can use the MLLib function or do the following (which is what I ha= d > done): > > - in first pass over the data, using mapPartitionWithIndex, gather the > first item in each partition. you can use collect (or aggregator) for = this. > =E2=80=9Ckey=E2=80=9D them by the partition index. at the end, you wil= l have a map > (partition index) --> first item > - in the second pass over the data, using mapPartitionWithIndex again, > look at two (or in the general case N items at a time, you can use sca= la=E2=80=99s > sliding iterator) items at a time and check the time difference(or any > sliding window computation). To this mapParitition, pass the map creat= ed in > previous step. You will need to use them to check the last item in thi= s > partition. > > If you can tolerate a few inaccuracies then you can just do the second > step. You will miss the =E2=80=9Cboundaries=E2=80=9D of the partitions= but it might be > acceptable for your use case.

On Tue, Jun 30, 2015 at 12:21 PM, RJ Now= ling <rnowling@gmail.com> wrote:
That's an interesting idea!=C2=A0 I hadn't= considered that.=C2=A0 However, looking at the Partitioner interface, I wo= uld need to know from looking at a single key which doesn't fit my case= , unfortunately.=C2=A0 For my case, I need to compare successive pairs of k= eys. =C2=A0(I'm trying to re-join lines that were split prematurely.)

=
On Tue, Jun 30, 2015 at 2:07 PM, Abhishek R. Sin= gh <abhishsi@tetrationanalytics.com> wrote:
could y= ou use a custom partitioner to preserve boundaries such that all related tu= ples end up on the same partition?

On Jun 30, 2= 015, at 12:00 PM, RJ Nowling <rnowling@gmail.com> wrote:

Thanks, Reynold.=C2=A0 I still need to handle in= complete groups that fall between partition boundaries. So, I need a two-pa= ss approach. I came up with a somewhat hacky way to handle those using the = partition indices and key-value pairs as a second pass after the first.
OCaml's std library provides a function called group() = that takes a break function that operators on pairs of successive elements.= =C2=A0 It seems a similar approach could be used in Spark and would be more= efficient than my approach with key-value pairs since you know the orderin= g of the partitions.

Has this need been expressed = by others? =C2=A0

On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin <rxin@data= bricks.com> wrote:
Try mapPartitions, which gives you an iterator, and you can produc= e an iterator back.


On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowli= ng <rnowling@gmail.com> wrote:
Hi all,

I have a problem where I = have a RDD of elements:

Item1 Item2 Item3 Item4 It= em5 Item6 ...

and I want to run a function over th= em to decide which runs of elements to group together:

=
[Item1 Item2] [Item3] [Item4 Item5 Item6] ...

Technically, I could use aggregate to do this, but I would have to use a L= ist of List of T which would produce a very large collection in memory.

Is there an easy way to accomplish this? =C2=A0e.g.,,= it would be nice to have a version of aggregate where the combination func= tion can return a complete group that is added to the new RDD and an incomp= lete group which is passed to the next call of the reduce function.

Thanks,
RJ





--001a11472168d234460519e7c1f2--