Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B763118DAC for ; Thu, 8 Oct 2015 11:04:52 +0000 (UTC) Received: (qmail 41034 invoked by uid 500); 8 Oct 2015 11:04:42 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 40933 invoked by uid 500); 8 Oct 2015 11:04:42 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 40923 invoked by uid 99); 8 Oct 2015 11:04:42 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Oct 2015 11:04:42 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id B712D1A2489 for ; Thu, 8 Oct 2015 11:04:41 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.879 X-Spam-Level: ** X-Spam-Status: No, score=2.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id CXvQeEhZSJO1 for ; Thu, 8 Oct 2015 11:04:41 +0000 (UTC) Received: from mail-oi0-f54.google.com (mail-oi0-f54.google.com [209.85.218.54]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id EF3D72164D for ; Thu, 8 Oct 2015 11:04:40 +0000 (UTC) Received: by oixx17 with SMTP id x17so24809376oix.0 for ; Thu, 08 Oct 2015 04:04:34 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :content-type; bh=wOZUjWfRI6sUnH8UMcCJUxv5R8n/PiD9L6aW8Di23YA=; b=jXDKu7AXfVe22RmxOg7MGt7mgQo3Kfcu1D1UFtHKZbPawt3DCMMKMWvr6RrNIXkCEK 1DD54StlE1l68hwsZ8o/jjG4ficV3j1wRhuHRT5TdZyB/rZo5dCTkWIH5835qhvvchjU ST/hnuL8W2vHdaX5+3iFihMK1gavjzcLe1OAOJlRxI5bbA4kbBhMNxnym/sL/Z0juZpg x+vYOY2DSZSSegKzrUf4RTfnNczMvuAd+C74iGCU75ZRsH1sQKiyCOnGWeYnd8WE+VQE t26pZJqq/5VwL0GsAUblatwEigFOrmfYWr1GCjjc4ff1aR4hpVvo++8Hc9Xx6f6l4a9W N2ag== X-Received: by 10.202.231.20 with SMTP id e20mr3731153oih.129.1444302273933; Thu, 08 Oct 2015 04:04:33 -0700 (PDT) MIME-Version: 1.0 References: <56164c72.4fda8c0a.63682.4615@mx.google.com> In-Reply-To: <56164c72.4fda8c0a.63682.4615@mx.google.com> From: Aniket Bhatnagar Date: Thu, 08 Oct 2015 11:04:24 +0000 Message-ID: Subject: Re: Example of updateStateByKey with initial RDD? To: Bryan , user Content-Type: multipart/alternative; boundary=001a11413996fc9ae0052195d31e --001a11413996fc9ae0052195d31e Content-Type: text/plain; charset=UTF-8 Here is an example: val interval = 60 * 1000 val counts = eventsStream.map(event => { (event.timestamp - event.timestamp % interval, event) }).updateStateByKey[Long](updateFunc = (events: Seq[Event], prevStateOpt: Option[Long]) => { val prevCount = prevStateOpt.getOrElse(0L) val newCount = prevCount + events.size Some(newCount) }) counts.print() Hope it helps! Thanks, Aniket On Thu, Oct 8, 2015 at 4:29 PM Bryan wrote: > Hello, > > Can anyone point me to a good example of updateStateByKey with an initial > RDD? I am seeing a compile time error when following the API. > > Regards, > > Bryan Jeffrey > --001a11413996fc9ae0052195d31e Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Here is an example:

val interval = =3D 60 * 1000
val counts =3D eventsStream.map(event =3D> {
=C2=A0 (event.timestamp - event.timestamp % interval, event)
}).updateStateByKey[Long](updateFunc =3D (events: Seq[Event], prevStateO= pt: Option[Long]) =3D> {
=C2=A0 val prevCount =3D prevStateOpt= .getOrElse(0L)
=C2=A0 val newCount =3D prevCount + events.size
=C2=A0 Some(newCount)
})
counts.print()

Hope it helps!

Thanks,
Aniket

On= Thu, Oct 8, 2015 at 4:29 PM Bryan <bryan.jeffrey@gmail.com> wrote:
Hello,

Can anyone point me to a good example of updateStateBy= Key with an initial RDD? I am seeing a compile time error when following th= e API.

Regards,

Bryan Jeffrey
<= /div> --001a11413996fc9ae0052195d31e--