Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3E7AD200C05 for ; Mon, 9 Jan 2017 06:40:39 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 3D156160B45; Mon, 9 Jan 2017 05:40:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 61B66160B36 for ; Mon, 9 Jan 2017 06:40:38 +0100 (CET) Received: (qmail 3425 invoked by uid 500); 9 Jan 2017 05:40:36 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 3408 invoked by uid 99); 9 Jan 2017 05:40:36 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Jan 2017 05:40:36 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id BDF931A0509; Mon, 9 Jan 2017 05:40:35 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.793 X-Spam-Level: *** X-Spam-Status: No, score=3.793 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.001, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 6QB_KXM7UIqk; Mon, 9 Jan 2017 05:40:33 +0000 (UTC) Received: from mail-oi0-f54.google.com (mail-oi0-f54.google.com [209.85.218.54]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id BC7455FD3F; Mon, 9 Jan 2017 05:40:32 +0000 (UTC) Received: by mail-oi0-f54.google.com with SMTP id 3so496069564oih.1; Sun, 08 Jan 2017 21:40:32 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:sender:in-reply-to:references:from:date:message-id :subject:to:cc; bh=dTig8lf+huDPKEesGaVxYoF/1dzk8FZnnP/h0wHNp7c=; b=QALsPp9GM9P/+jDeag7Cfbe5P7QP0344vzZEJnHqKT8c9X1SlEP+eced3zvZiv8rzZ GCyE7c5b6+dhEJj17SFRLQEtV5ouIslp1/iNcMfYnJzRTnW5iww3200d7OC3IcF1a8ww s7Nu0MJcptoIl1E+ZFulcLsD1GXdlZCvGVlgqgJ9O95eulWDnF9Xx3slDwq6NHXbh+m2 GM+0s2gO0n7xEFTzUkbhzxI8+TVV7VTqrliWDprhnhm672KWeQYzpi46UObKPEuzPE+6 VBfG5elPtwjGtfWsxqZnyYfeeLppG3UBdQB4hECjypA3vhfKA+V6sSwVy5i7ziPBC+70 oT3w== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:sender:in-reply-to:references:from :date:message-id:subject:to:cc; bh=dTig8lf+huDPKEesGaVxYoF/1dzk8FZnnP/h0wHNp7c=; b=KEGRUWbiD/ayQEJXCNGa9k81Yq8SgIXygTHxWIKpMn5plHnelMAcnrmkS0L1LmCg1O oSk2YD9t/7yqm59t5p2dnJxNGFtQ1D8TPT6wdsqYLk02RtgH2l1CRNyWPMv7mDew7lMi +X8UFgbU15YVbMOVOwt484oNTuefUETul1QuvRp9+2llL+Cx82nNxCDqG4uQblPi0E1e BH21hesf3eBuyvikF33obiGCe0MZ168hc4yM0WyAK6Nh5udp+qYlRj32dcveB5hggoFA 9r+UdCJ50/Hn+3r9RhuidDn6VYHnTjrKPNVtONBxGVp2tV1ZbJjlHt42dPk5QVLZ2OG3 SVIw== X-Gm-Message-State: AIkVDXKG83eTxyGoUkY+SbdJVY/VGjxgaHD5dTiLdOeJNW5+YS8B5mw4nsUoXbqfcD11NO90r5Dy8ay8i4wtqw== X-Received: by 10.157.31.93 with SMTP id x29mr6758725otx.133.1483940425716; Sun, 08 Jan 2017 21:40:25 -0800 (PST) MIME-Version: 1.0 Sender: holden.karau@gmail.com Received: by 10.182.166.6 with HTTP; Sun, 8 Jan 2017 21:40:25 -0800 (PST) In-Reply-To: <1483936217357-20515.post@n3.nabble.com> References: <1483731016869-20496.post@n3.nabble.com> <1483936217357-20515.post@n3.nabble.com> From: Holden Karau Date: Sun, 8 Jan 2017 21:40:25 -0800 X-Google-Sender-Auth: GgYnVj0bS-cspXHE9Cx8qm1pvqA Message-ID: Subject: Re: handling of empty partitions To: Liang-Chi Hsieh , "user@spark.apache.org" Cc: "dev@spark.apache.org" Content-Type: multipart/alternative; boundary=001a113de47ef15c380545a2cd13 archived-at: Mon, 09 Jan 2017 05:40:39 -0000 --001a113de47ef15c380545a2cd13 Content-Type: text/plain; charset=UTF-8 Hi Georg, Thanks for the question along with the code (as well as posting to stack overflow). In general if a question is well suited for stackoverflow its probably better suited to the user@ list instead of the dev@ list so I've cc'd the user@ list for you. As far as handling empty partitions when working mapPartitions (and similar), the general approach is to return an empty iterator of the correct type when you have an empty input iterator. It looks like your code is doing this, however it seems like you likely have a bug in your application logic (namely it assumes that if a partition has a record missing a value it will either have had a previous row in the same partition which is good OR that the previous partition is not empty and has a good row - which need not necessarily be the case). You've partially fixed this problem by going through and for each partition collecting the last previous good value, and then if you don't have a good value at the start of a partition look up the value in the collected array. However, if this also happens at the same time the previous partition is empty, you will need to go and lookup the previous previous partition value until you find the one you are looking for. (Note this assumes that the first record in your dataset is valid, if it isn't your code will still fail). Your solution is really close to working but just has some minor assumptions which don't always necessarily hold. Cheers, Holden :) On Sun, Jan 8, 2017 at 8:30 PM, Liang-Chi Hsieh wrote: > > Hi Georg, > > Can you describe your question more clear? > > Actually, the example codes you posted in stackoverflow doesn't crash as > you > said in the post. > > > geoHeil wrote > > I am working on building a custom ML pipeline-model / estimator to impute > > missing values, e.g. I want to fill with last good known value. > > Using a window function is slow / will put the data into a single > > partition. > > I built some sample code to use the RDD API however, it some None / null > > problems with empty partitions. > > > > How should this be implemented properly to handle such empty partitions? > > http://stackoverflow.com/questions/41474175/spark- > mappartitionswithindex-handling-empty-partitions > > > > Kind regards, > > Georg > > > > > > ----- > Liang-Chi Hsieh | @viirya > Spark Technology Center > http://www.spark.tc/ > -- > View this message in context: http://apache-spark- > developers-list.1001551.n3.nabble.com/handling-of-empty- > partitions-tp20496p20515.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe e-mail: dev-unsubscribe@spark.apache.org > > -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau --001a113de47ef15c380545a2cd13 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Georg,

Thanks for the question along= with the code (as well as posting to stack overflow). In general if a ques= tion is well suited for stackoverflow its probably better suited to the use= r@ list instead of the dev@ list so I've cc'd the user@ list for yo= u.

As far as handling empty partitions when workin= g mapPartitions (and similar), the general approach is to return an empty i= terator of the correct type when you have an empty input iterator.

It looks like your code is doing this, however it seems li= ke you likely have a bug in your application logic (namely it assumes that = if a partition has a record missing a value it will either have had a previ= ous row in the same partition which is good OR that the previous partition = is not empty and has a good row - which need not necessarily be the case). = You've partially fixed this problem by going through and for each parti= tion collecting the last previous good value, and then if you don't hav= e a good value at the start of a partition look up the value in the collect= ed array.

However, if this also happens at the sam= e time the previous partition is empty, you will need to go and lookup the = previous previous partition value until you find the one you are looking fo= r. (Note this assumes that the first record in your dataset is valid, if it= isn't your code will still fail).

Your so= lution is really close to working but just has some minor assumptions which= don't always necessarily hold.

Cheers,
<= div>
Holden :)

On Sun, Jan 8, 2017 at 8:30 PM, Liang-Chi Hsieh <vii= rya@gmail.com> wrote:

Hi Georg,

Can you describe your question more clear?

Actually, the example codes you posted in stackoverflow doesn't crash a= s you
said in the post.


geoHeil wrote
> I am working on building a custom ML pipeline-model /= estimator to impute
> missing values, e.g. I want to fill with last good known value.
> Using a window function is slow / will put the data into a single
> partition.
> I built some sample code to use the RDD API however, it some None / nu= ll
> problems with empty partitions.
>
> How should this be implemented properly to handle such empty partition= s?
> http://stackoverflow.com/questions/41474175/spark-mappartition= swithindex-handling-empty-partitions
>
> Kind regards,
> Georg





-----
Liang-Chi Hsieh | @viirya
Spark Technology Center
http:= //www.spark.tc/
--
View this message in context: http://apache-spark-developers-list= .1001551.n3.nabble.com/handling-of-empty-partitions-tp20496p20515= .html
Sent from the Apache Spark Develope= rs List mailing list archive at Nabble.com.

-----------------------------------------------------------------= ----
To unsubscribe e-mail: = dev-unsubscribe@spark.apache.org




--
=
Cell : 425-233-8271
--001a113de47ef15c380545a2cd13--