Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5F89918915 for ; Thu, 16 Jul 2015 03:31:52 +0000 (UTC) Received: (qmail 97393 invoked by uid 500); 16 Jul 2015 03:31:48 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 97309 invoked by uid 500); 16 Jul 2015 03:31:48 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 97298 invoked by uid 99); 16 Jul 2015 03:31:48 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Jul 2015 03:31:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 8F30DD4EF6 for ; Thu, 16 Jul 2015 03:31:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.65 X-Spam-Level: *** X-Spam-Status: No, score=3.65 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, KAM_INFOUSMEBIZ=0.75, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id LLvWg_CN9VlT for ; Thu, 16 Jul 2015 03:31:42 +0000 (UTC) Received: from mail-yk0-f169.google.com (mail-yk0-f169.google.com [209.85.160.169]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 939CB20EFB for ; Thu, 16 Jul 2015 03:31:41 +0000 (UTC) Received: by ykeo3 with SMTP id o3so53982158yke.0 for ; Wed, 15 Jul 2015 20:30:55 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :cc:content-type; bh=8A7SCVBblBlPkjuz9oZ6cpilsvDULTFD/maB71JHk4A=; b=h9zP4BRqQEblvdaSBBb4ORSAi7TqN0wVNvdudqnjlijfd3rI5501jrcO3FA5ga5GcX kiCwKlK4ek58q/rOGQyP8i+kP8vK0hs3/18c2Wo6dKKbadUVFPIUnpf3r+8DSd5ph6Gu HJj1d/DbiHjppTaA8/uUJ+Q27AORgK3Y+BaOVuzKWABIOolIbm6bBjpPno0yTuEtrs0t +KwhOC48G42bPwkyZPvoPjNsEnlihbf8AaHGXbbi9EEBXNIbmHv2OP8coLEELYJz28Xf MQ0KRmVZNv9L+vtgANUd7UG/W4vUteSa+y4oWcObZpTG3bq9g51rzZenB3UXluthMM35 qcsA== MIME-Version: 1.0 X-Received: by 10.129.102.213 with SMTP id a204mr7458475ywc.19.1437017455606; Wed, 15 Jul 2015 20:30:55 -0700 (PDT) Received: by 10.37.214.144 with HTTP; Wed, 15 Jul 2015 20:30:55 -0700 (PDT) In-Reply-To: References: Date: Wed, 15 Jul 2015 20:30:55 -0700 Message-ID: Subject: Re: Possible to combine all RDDs from a DStream batch into one? From: Ted Yu To: N B Cc: Jon Chase , "user@spark.apache.org" Content-Type: multipart/alternative; boundary=001a11490146fa4249051af5b253 --001a11490146fa4249051af5b253 Content-Type: text/plain; charset=UTF-8 Looks like this method should serve Jon's needs: def reduceByWindow( reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration On Wed, Jul 15, 2015 at 8:23 PM, N B wrote: > Hi Jon, > > In Spark streaming, 1 batch = 1 RDD. Essentially, the terms are used > interchangeably. If you are trying to collect multiple batches across a > DStream into a single RDD, look at the window() operations. > > Hope this helps > Nikunj > > > On Wed, Jul 15, 2015 at 7:00 PM, Jon Chase wrote: > >> I should note that the amount of data in each batch is very small, so I'm >> not concerned with performance implications of grouping into a single RDD. >> >> On Wed, Jul 15, 2015 at 9:58 PM, Jon Chase wrote: >> >>> I'm currently doing something like this in my Spark Streaming program >>> (Java): >>> >>> dStream.foreachRDD((rdd, batchTime) -> { >>> log.info("processing RDD from batch {}", batchTime); >>> .... >>> // my rdd processing code >>> .... >>> }); >>> >>> Instead of having my rdd processing code called once for each RDD in the >>> batch, is it possible to essentially group all of the RDDs from the batch >>> into a single RDD and single partition and therefore operate on all of the >>> elements in the batch at once? >>> >>> My goal here is to do an operation exactly once for every batch. As I >>> understand it, foreachRDD is going to do the operation once for each RDD in >>> the batch, which is not what I want. >>> >>> I've looked at DStream.repartition(int), but the docs make it sound like >>> it only changes the number of partitions in the batch's existing RDDs, not >>> the number of RDDs. >>> >> >> > --001a11490146fa4249051af5b253 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Looks like this method should serve Jon's needs:
<= br>
=C2=A0 def reduceByWindow(
=C2=A0 =C2=A0 =C2=A0 redu= ceFunc: (T, T) =3D> T,
=C2=A0 =C2=A0 =C2=A0 windowDuration: Du= ration,
=C2=A0 =C2=A0 =C2=A0 slideDuration: Duration
<= /div>

On Wed= , Jul 15, 2015 at 8:23 PM, N B <nb.nospam@gmail.com> wrote= :
Hi Jon,

=
In Spark streaming, 1 batch =3D 1 RDD. Essentially, the terms are used= interchangeably. If you are trying to collect multiple batches across a DS= tream into a single RDD, look at the window() operations.

Hope this helps
Nikunj


On Wed, Jul 15, 2015 at 7:00 PM, Jon Chase <jon.chase@g= mail.com> wrote:
I should note that the amount of data in each batch is very small,= so I'm not concerned with performance implications of grouping into a = single RDD. =C2=A0

On Wed, Jul 15, 2015 at 9:58 PM, Jon Chase <jon.chase@g= mail.com> wrote:
I'm currently doing something like this in my Spark Streaming = program (Java):

=C2=A0 =C2=A0 =C2=A0 =C2=A0 dStream= .foreachRDD((rdd, batchTime) -> {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 log.info(&q= uot;processing RDD from batch {}", batchTime);
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 ....
=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 // my rdd processing code
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 ....
=C2=A0 =C2=A0 =C2=A0 =C2=A0 });

Instead of having my rdd processing code called once for = each RDD in the batch, is it possible to essentially group all of the RDDs = from the batch into a single RDD and single partition and therefore operate= on all of the elements in the batch at once?=C2=A0

My goal here is to do an operation exactly once for every batch.=C2=A0 As= I understand it, foreachRDD is going to do the operation once for each RDD= in the batch, which is not what I want. =C2=A0

I&= #39;ve looked at DStream.repartition(int), but the docs make it sound like = it only changes the number of partitions in the batch's existing RDDs, = not the number of RDDs.



--001a11490146fa4249051af5b253--