Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B425A200C53 for ; Tue, 28 Mar 2017 05:02:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B2AAF160B85; Tue, 28 Mar 2017 03:02:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 04FCB160B9A for ; Tue, 28 Mar 2017 05:02:45 +0200 (CEST) Received: (qmail 17707 invoked by uid 500); 28 Mar 2017 03:02:45 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 17696 invoked by uid 99); 28 Mar 2017 03:02:45 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Mar 2017 03:02:45 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 54D5BC50CC for ; Tue, 28 Mar 2017 03:02:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.002 X-Spam-Level: X-Spam-Status: No, score=-100.002 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id R1RnQREU7im5 for ; Tue, 28 Mar 2017 03:02:43 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 33A855FDA9 for ; Tue, 28 Mar 2017 03:02:43 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 8F8FAE0B33 for ; Tue, 28 Mar 2017 03:02:42 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id C8AD32406F for ; Tue, 28 Mar 2017 03:02:41 +0000 (UTC) Date: Tue, 28 Mar 2017 03:02:41 +0000 (UTC) From: "Aviem Zur (JIRA)" To: commits@beam.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (BEAM-1074) Set default-partitioner in SourceRDD.Unbounded. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Tue, 28 Mar 2017 03:02:46 -0000 [ https://issues.apache.org/jira/browse/BEAM-1074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aviem Zur updated BEAM-1074: ---------------------------- Description: The SparkRunner uses {{mapWithState}} to read and manage CheckpointMarks, and this stateful operation will be followed by a shuffle: https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala#L159 Since the stateful read maps "splitSource" -> "partition of a list of read values", the following shuffle won't benefit in any way (the list of read values has not been flatMapped yet). In order to avoid shuffle we need to set the input RDD ({{SourceRDD.Unbounded}}) partitioner to be a default {{HashPartitioner}} since {{mapWithState}} would use the same partitioner and will skip shuffle if the partitioners match. was:This will make sure the following stateful read within {{mapWithState}} won't shuffle the read values as long as they are grouped in a {{List}}. > Set default-partitioner in SourceRDD.Unbounded. > ----------------------------------------------- > > Key: BEAM-1074 > URL: https://issues.apache.org/jira/browse/BEAM-1074 > Project: Beam > Issue Type: Improvement > Components: runner-spark > Reporter: Amit Sela > Assignee: Aviem Zur > Fix For: First stable release > > > The SparkRunner uses {{mapWithState}} to read and manage CheckpointMarks, and this stateful operation will be followed by a shuffle: > https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala#L159 > Since the stateful read maps "splitSource" -> "partition of a list of read values", the following shuffle won't benefit in any way (the list of read values has not been flatMapped yet). In order to avoid shuffle we need to set the input RDD ({{SourceRDD.Unbounded}}) partitioner to be a default {{HashPartitioner}} since {{mapWithState}} would use the same partitioner and will skip shuffle if the partitioners match. -- This message was sent by Atlassian JIRA (v6.3.15#6346)