Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 09BD7200B30 for ; Mon, 4 Jul 2016 11:56:21 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 085D2160A65; Mon, 4 Jul 2016 09:56:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 51DCB160A55 for ; Mon, 4 Jul 2016 11:56:20 +0200 (CEST) Received: (qmail 57094 invoked by uid 500); 4 Jul 2016 09:56:19 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 57084 invoked by uid 99); 4 Jul 2016 09:56:19 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Jul 2016 09:56:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id F33BB180502 for ; Mon, 4 Jul 2016 09:56:18 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.746 X-Spam-Level: X-Spam-Status: No, score=-0.746 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=2, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_MED=-2.3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id HOq_ztkkdXqa for ; Mon, 4 Jul 2016 09:56:17 +0000 (UTC) Received: from mail.tu-berlin.de (mail.tu-berlin.de [130.149.7.33]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 9B4E35F472 for ; Mon, 4 Jul 2016 09:56:16 +0000 (UTC) X-tubIT-Incoming-IP: 130.149.6.149 Received: from ex-mbx05.tubit.win.tu-berlin.de ([130.149.6.149] helo=exchange.tu-berlin.de) by mail.tu-berlin.de (exim-4.84_2/mailfrontend-5) with esmtp for id 1bK0bt-0003uQ-9S; Mon, 04 Jul 2016 11:56:15 +0200 Received: from [141.23.67.80] (141.23.67.80) by EX-MBX05.tubit.win.tu-berlin.de (130.149.6.149) with Microsoft SMTP Server (TLS) id 15.0.1178.4; Mon, 4 Jul 2016 11:56:06 +0200 From: Adrian Bartnik Subject: Flink programm with for loop yields wrong results when run in parallel To: Message-ID: <577A32B6.80208@campus.tu-berlin.de> Date: Mon, 4 Jul 2016 11:56:06 +0200 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:38.0) Gecko/20100101 Thunderbird/38.8.0 MIME-Version: 1.0 Content-Type: multipart/alternative; boundary="------------020907090504050807000405" X-ClientProxiedBy: EX-CAS-01.tubit.win.tu-berlin.de (130.149.6.78) To EX-MBX05.tubit.win.tu-berlin.de (130.149.6.149) X-PMWin-Version: 4.0.1, Antivirus-Engine: 3.64.3, Antivirus-Data: 5.29 X-PureMessage: [Scanned] archived-at: Mon, 04 Jul 2016 09:56:21 -0000 --------------020907090504050807000405 Content-Type: text/plain; charset="utf-8"; format=flowed Content-Transfer-Encoding: 7bit Hi, I have a Flink programm, which outputs wrong results once I set the parallelism to a value larger that 1. If I run the programm with parallelism 1, everything works fine. The algorithm works on one input dataset, which will iteratively be split until the desired output split size is reached. The way how to split the cluster in each iteration is also determined iteratively. Pseudocode: val input = DataSet for (currentSplitNumber <- 1 to numberOfSplits) { // Split dataset until desired #splits was reached // Iteratively compute best split Dataset determinedSplit = Iteration involving input // Split dataset to 2 smaller ones val tmpDataSet1 = determinedSplit.filter(x ==1) ... val tmpDataSet2 = determinedSplit.filter(x ==0) ... tmpDataSet1.count() // These are necessary, to store the size of each split tmpDataSet2.count() // Store tmpDataSet1 and 2 as they are needed in one of the next loop executions (as dataset to be split) ... } In all comes down to 2 nested loops, one of which can be replaced by a iteration. As nested iterations are not supported yet, I do not know how to avoid the outer loop. Is this a know problem, and if yes, what would be a solution? Best, Adrian --------------020907090504050807000405 Content-Type: text/html; charset="utf-8" Content-Transfer-Encoding: 8bit Hi,

I have a Flink programm, which outputs wrong results once I set the parallelism to a value larger that 1.
If I run the programm with parallelism 1, everything works fine.

The algorithm works on one input dataset, which will iteratively be split until the desired output split size is reached.
The way how to split the cluster in each iteration is also determined iteratively.

Pseudocode:

val input = DataSet

for (currentSplitNumber <- 1 to numberOfSplits) { // Split dataset until desired #splits was reached
    // Iteratively compute best split
    Dataset determinedSplit = Iteration involving input

    // Split dataset to 2 smaller ones
    val tmpDataSet1 = determinedSplit.filter(x ==1) ...
    val tmpDataSet2 = determinedSplit.filter(x ==0) ...

    tmpDataSet1.count() // These are necessary, to store the size of each split
    tmpDataSet2.count()

    // Store tmpDataSet1 and 2 as they are needed in one of the next loop executions (as dataset to be split)
    ...

}

In all comes down to 2 nested loops, one of which can be replaced by a iteration.
As nested iterations are not supported yet, I do not know how to avoid the outer loop.

Is this a know problem, and if yes, what would be a solution?

Best,
Adrian
--------------020907090504050807000405--