Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F2BBE18F79 for ; Tue, 9 Jun 2015 08:35:03 +0000 (UTC) Received: (qmail 82003 invoked by uid 500); 9 Jun 2015 08:34:59 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 81927 invoked by uid 500); 9 Jun 2015 08:34:59 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 81916 invoked by uid 99); 9 Jun 2015 08:34:59 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Jun 2015 08:34:59 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 3C7371A4761 for ; Tue, 9 Jun 2015 08:34:59 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 4.193 X-Spam-Level: **** X-Spam-Status: No, score=4.193 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id D3ljwsEcoD-x for ; Tue, 9 Jun 2015 08:34:55 +0000 (UTC) Received: from mail-wi0-f177.google.com (mail-wi0-f177.google.com [209.85.212.177]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 661EF2762B for ; Tue, 9 Jun 2015 08:34:54 +0000 (UTC) Received: by wibut5 with SMTP id ut5so9883664wib.1 for ; Tue, 09 Jun 2015 01:34:46 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :cc:content-type; bh=XYi9PKFo+7rubqXPGTWtI15V4vwNyMJtRDBdge6Sryc=; b=m6fQzTN7HiO9BZSvMSwbMz/aU6iQCb+2uqbOx0D+Jec1uXI5FREW0NI4zUFTa8hMLC w+ypRWBIq0NbzN9bJgjaep/ZewaTDJG9N+TiF7Zi48BDsID10UDuZ8EqZdhzcR5aEhMq Q/1dhps7hj0IsVdrCV+dayTEn3JUg+5BLXQidsHYm2oFKIaQTOHt0tpwSNe01RrDlrlc D0UEe+lUK83kvCg+qFlHbZZ/F36wc/bEgEdP32kZWv3Kq9vlGMPoVhhL8nsWjro+o1NC UAmYn+Jrjdf05covkQHEA6z41pzXkYpeDBjIVkg//ONo4cOCXp0aI4eYoh+8AJCnHtWe To/w== MIME-Version: 1.0 X-Received: by 10.180.73.230 with SMTP id o6mr30481291wiv.11.1433838886850; Tue, 09 Jun 2015 01:34:46 -0700 (PDT) Received: by 10.180.107.193 with HTTP; Tue, 9 Jun 2015 01:34:46 -0700 (PDT) In-Reply-To: References: Date: Tue, 9 Jun 2015 14:04:46 +0530 Message-ID: Subject: Re: RDD of RDDs From: kiran lonikar To: ping yan Cc: user Content-Type: multipart/alternative; boundary=f46d0435c018841aef051811a1f4 --f46d0435c018841aef051811a1f4 Content-Type: text/plain; charset=UTF-8 Possibly in future, if and when spark architecture allows workers to launch spark jobs (the functions passed to transformation or action APIs of RDD), it will be possible to have RDD of RDD. On Tue, Jun 9, 2015 at 1:47 PM, kiran lonikar wrote: > Simillar question was asked before: > http://apache-spark-user-list.1001560.n3.nabble.com/Rdd-of-Rdds-td17025.html > > Here is one of the reasons why I think RDD[RDD[T]] is not possible: > > - RDD is only a handle to the actual data partitions. It has a > reference/pointer to the *SparkContext* object (*sc*) and a list of > partitions. > - The *SparkContext *is an object in the Spark Application/Driver > Program's JVM. Similarly, the list of partitions is also in the JVM of the > driver program. Each partition contains kind of "remote references" to the > partition data on the worker JVMs. > - The functions passed to RDD's transformations and actions execute in > the worker's JVMs on different nodes. For example, in "*rdd.map { x => > x*x }*", the function performing "*x*x*" runs on the JVMs of the > worker nodes where the partitions of the RDD reside. These JVMs do not have > access to the "*sc*" since its only on the driver's JVM. > - Thus, in case of your *RDD of RDD*: *outerRDD.map { innerRdd => > innerRDD.filter { x => x*x } }*, the worker nodes will not be able to > execute the *filter* on *innerRDD *as the code in the worker does not > have access to "sc" and can not launch a spark job. > > > Hope it helps. You need to consider List[RDD] or some other collection. > > -Kiran > > On Tue, Jun 9, 2015 at 2:25 AM, ping yan wrote: > >> Hi, >> >> >> The problem I am looking at is as follows: >> >> - I read in a log file of multiple users as a RDD >> >> - I'd like to group the above RDD into *multiple RDDs* by userIds (the >> key) >> >> - my processEachUser() function then takes in each RDD mapped into >> each individual user, and calls for RDD.map or DataFrame operations on >> them. (I already had the function coded, I am therefore reluctant to work >> with the ResultIterable object coming out of rdd.groupByKey() ... ) >> >> I've searched the mailing list and googled on "RDD of RDDs" and seems >> like it isn't a thing at all. >> >> A few choices left seem to be: 1) groupByKey() and then work with the >> ResultIterable object; 2) groupbyKey() and then write each group into a >> file, and read them back as individual rdds to process.. >> >> Anyone got a better idea or had a similar problem before? >> >> >> Thanks! >> Ping >> >> >> >> >> >> >> -- >> Ping Yan >> Ph.D. in Management >> Dept. of Management Information Systems >> University of Arizona >> Tucson, AZ 85721 >> >> > --f46d0435c018841aef051811a1f4 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Possibly in futur= e, if and when spark architecture allows workers to launch spark jobs (the = functions passed to transformation or action APIs of RDD), it will be possi= ble to have RDD of RDD.

On Tue, Jun 9, 2015 at 1:47 PM, kiran lonikar <loni= kar@gmail.com> wrote:
Simillar question was asked before:=C2=A0http://apache-spark-user-list.1001560.n3.nabble.com/Rdd-of-Rdds= -td17025.html

Here is one of the reasons why I think= RDD[RDD[T]] is not possible:
  • RDD is only a handle to the= actual data partitions. It has a reference/pointer to the SparkContext<= /i> object (sc) and a list of partitions.
  • The SparkCo= ntext is an object in the Spark Application/Driver Program's JVM. S= imilarly, the list of partitions is also in the JVM of the driver program. = Each partition contains kind of "remote references" to the partit= ion data on the worker JVMs.
  • The functions passed to RDD's = transformations and actions execute in the worker's JVMs on different n= odes. For example, in "rdd.map = { x =3D> x*x }", the function performing "x*x" runs on the JVMs of the = worker nodes where the partitions of the RDD reside. These JVMs do not have= access to the "sc" since its only on the driver's JVM= .
  • Thus, in case of your RDD of RDD: outerRDD.map { innerRdd =3D> innerRDD.filter { x =3D&= gt; x*x } }, the worker nodes will not be able to execute the filter on innerRDD as the code in the worker = does not have access to "sc" and can not launch a spark job.
    <= /li>

Hope it helps. You need to consider List= [RDD] or some other collection.

-Kiran


--f46d0435c018841aef051811a1f4--