Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 11A25200C53 for ; Tue, 28 Mar 2017 05:04:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 100E7160B9A; Tue, 28 Mar 2017 03:04:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 555DD160B85 for ; Tue, 28 Mar 2017 05:04:45 +0200 (CEST) Received: (qmail 20497 invoked by uid 500); 28 Mar 2017 03:04:44 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 20487 invoked by uid 99); 28 Mar 2017 03:04:44 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Mar 2017 03:04:44 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 105411A57B3 for ; Tue, 28 Mar 2017 03:04:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.002 X-Spam-Level: X-Spam-Status: No, score=-100.002 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id BtUHnZRS43sL for ; Tue, 28 Mar 2017 03:04:43 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id AA5E55FAD2 for ; Tue, 28 Mar 2017 03:04:42 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id CE77BE02CB for ; Tue, 28 Mar 2017 03:04:41 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 85B9821DB8 for ; Tue, 28 Mar 2017 03:04:41 +0000 (UTC) Date: Tue, 28 Mar 2017 03:04:41 +0000 (UTC) From: "Aviem Zur (JIRA)" To: commits@beam.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (BEAM-848) A better shuffle after reading from within mapWithState. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Tue, 28 Mar 2017 03:04:46 -0000 [ https://issues.apache.org/jira/browse/BEAM-848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aviem Zur updated BEAM-848: --------------------------- Description: It would be wise to shuffle the read values _after_ flatmap to increase parallelism in processing of the data. (was: The SparkRunner uses {{mapWithState}} to read and manage CheckpointMarks, and this stateful operation will be followed by a shuffle: https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala#L159 Since the stateful read maps "splitSource" -> "partition of a list of read values", the following shuffle won't benefit in any way (the list of read values has not been flatMapped yet). In order to avoid shuffle we need to set the input RDD ({{SourceRDD.Unbounded}}) partitioner to be a default {{HashPartitioner}} since {{mapWithState}} would use the same partitioner and will skip shuffle if the partitioners match. It would be wise to shuffle the read values _after_ flatmap. I will break this into two tasks: # Set default-partitioner to the input RDD. # Shuffle (using Coders) the input.) > A better shuffle after reading from within mapWithState. > -------------------------------------------------------- > > Key: BEAM-848 > URL: https://issues.apache.org/jira/browse/BEAM-848 > Project: Beam > Issue Type: Improvement > Components: runner-spark > Reporter: Amit Sela > Assignee: Aviem Zur > > It would be wise to shuffle the read values _after_ flatmap to increase parallelism in processing of the data. -- This message was sent by Atlassian JIRA (v6.3.15#6346)