Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6607C189F8 for ; Mon, 1 Feb 2016 10:38:15 +0000 (UTC) Received: (qmail 52059 invoked by uid 500); 1 Feb 2016 10:38:05 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 51970 invoked by uid 500); 1 Feb 2016 10:38:05 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 51960 invoked by uid 99); 1 Feb 2016 10:38:05 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Feb 2016 10:38:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 4CF76C0873 for ; Mon, 1 Feb 2016 10:38:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.426 X-Spam-Level: *** X-Spam-Status: No, score=3.426 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=3, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.554] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id mAPS5KxFhMk7 for ; Mon, 1 Feb 2016 10:37:58 +0000 (UTC) Received: from mail.tu-berlin.de (mail.tu-berlin.de [130.149.7.33]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 2412820532 for ; Mon, 1 Feb 2016 10:37:57 +0000 (UTC) X-tubIT-Incoming-IP: 130.149.6.150 Received: from ex-mbx06.tubit.win.tu-berlin.de ([130.149.6.150] helo=exchange.tu-berlin.de) by mail.tu-berlin.de (exim-4.76/mailfrontend-8) with esmtp for id 1aQBrm-0008KD-ku; Mon, 01 Feb 2016 11:37:56 +0100 Received: from Fridtjofs-MBP.fritz.box (92.195.61.5) by EX-MBX06.tubit.win.tu-berlin.de (130.149.6.150) with Microsoft SMTP Server (TLS) id 15.0.1156.6; Mon, 1 Feb 2016 11:37:43 +0100 Subject: Re: join with no element appearing in multiple join-pairs To: References: <56AF344E.70109@mailbox.tu-berlin.de> From: Fridtjof Sander Message-ID: <56AF3580.9060000@mailbox.tu-berlin.de> Date: Mon, 1 Feb 2016 11:37:52 +0100 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.10; rv:38.0) Gecko/20100101 Thunderbird/38.5.1 MIME-Version: 1.0 In-Reply-To: <56AF344E.70109@mailbox.tu-berlin.de> Content-Type: multipart/alternative; boundary="------------030100010702060502020904" X-ClientProxiedBy: EX-CAS02.tubit.win.tu-berlin.de (130.149.6.142) To EX-MBX06.tubit.win.tu-berlin.de (130.149.6.150) X-PMWin-Version: 4.0.1, Antivirus-Engine: 3.63.0, Antivirus-Data: 5.23 X-PureMessage: [Scanned] --------------030100010702060502020904 Content-Type: text/plain; charset="utf-8"; format=flowed Content-Transfer-Encoding: 7bit (tried to reformat) Hi, I have a problem which seems to be unsolvable in Flink at the moment (1.0-Snapshot, current master branch) and I would kindly ask for some input, ideas on alternative approaches or just a confirmatory "yup, that doesn't work". ### Here's the situation: I have a dataset and its elements are totally ascending sorted by some key (Int). Each element has a "next-pointer" to its successor, which is just another field with the key of the following element: x0 -> x1 -> x2 -> x3 -> ... -> xn The keys are not necessarily increasing by 1, so it may be that: x0 has key 2 and x1 has key 10, x2 has 11, x3 has 25 and so on. I need to process that set in the following way: iterate: find all pairs of elements where "next == key" BUT make sure no element appears in multiple pairs example: do pair (x0, x1), (x2, x3), (x4, x5), ... but don't pair (x1, x2), (x3, x4), ... then, if some condition is met, combine a pair run above procedure again with switched pairing-condition: example: do pair (x1, x2), (x3, x4), (x5, x6), ... do not pair (x0, x1), (x2, x3), .. I hope the problem is clear... ### Now my approach: pseudo-scala-code: val indexed = input.zipWithIndex val flagged = indexed.map((i, el) => el.setFlag(i % 2 == 0)) val left = flagged.filter(el => el.flag) val right = flagged.filter(el => !el.flag) left.fullOuterJoin(right) .where(el.next) .equalTo(el.key) ... I attach my elements with a temporary key, that is increasing by 1, with zipWithIndex. Then, I map that tempKey to a boolean joinFlag: true if key is even, false if key is odd. Then I filter all elements with true, and put them in a dataset that is the left side of the next == key join. The right side are all elements with flag == false In the second run, I switch the flag construction to el.setFlag(i % 2 != 0). That actually works, there is only one problem: ### The problem: In my approach, I must not loose the total ordering of the data, because only if that ordering is preserved, the assignment of alternating join-flags works. Initially it is done by range-partitioning and partition-sorting. However, that ordering is destroyed, when data is shuffled for the join. And I can not restore it, because I have to run the whole thing in an iteration, and range-partitioning is not supported within iterations. ### Help? It sounds all very complicated, but the only thing I really have to solve is that join without any element appearing in multiple pairs (as described in "the situation"). If anyone has any idea how to solve this, that person would make my day so hard... Anyways, thanks for your time! Best, Fridtjof Am 01.02.16 um 11:32 schrieb Fridtjof Sander: > Hi, > > I have a problem which seems to be unsolvable in Flink at the moment > (1.0-Snapshot, current master branch) > and I would kindly ask for some input, ideas on alternative approaches > or just a confirmatory "yup, that doesn't work". > > ### Here's the situation: > > I have a dataset and its elements are totally ascending sorted by some > key (Int). Each element has a "next-pointer" to its successor, which > is just another field with the key of the following element: x0 -> x1 > -> x2 -> x3 -> ... -> xn The keys are not necessarily increasing by 1, > so it may be that: x0 has key 2 and x1 has key 10, x2 has 11, x3 has > 25 and so on. I need to process that set in the following way: > iterate: find all pairs of elements where "next == key" BUT make sure > no element appears in multiple pairs example: do pair (x0, x1), (x2, > x3), (x4, x5), ... but don't pair (x1, x2), (x3, x4), ... then, if > some condition is met, combine a pair run above procedure again with > switched pairing-condition: example: do pair (x1, x2), (x3, x4), (x5, > x6), ... do not pair (x0, x1), (x2, x3), .. I hope the problem is > clear... ### Now my approach: pseudo-scala-code: > > val indexed = input.zipWithIndex val flagged = indexed.map((i, el) => > el.setFlag(i % 2 == 0)) val left = flagged.filter(el => el.flag) > val right = flagged.filter(el => !el.flag) left.fullOuterJoin(right) > .where(el.next) .equalTo(el.key) ... I attach my elements with a > temporary key, that is increasing by 1, with zipWithIndex. Then, I map > that tempKey to a boolean joinFlag: true if key is even, false if key > is odd. Then I filter all elements with true, and put them in a > dataset that is the left side of the next == key join. The right side > are all elements with flag == false In the second run, I switch the > flag construction to el.setFlag(i % 2 != 0). That actually works, > there is only one problem: ### The problem: In my approach, I must not > loose the total ordering of the data, because only if that ordering is > preserved, the assignment of alternating join-flags works. Initially > it is done by range-partitioning and partition-sorting. However, that > ordering is destroyed, when data is shuffled for the join. And I can > not restore it, because I have to run the whole thing in an iteration, > and range-partitioning is not supported within iterations. ### Help? > It sounds all very complicated, but the only thing I really have to > solve is that join without any element appearing in multiple pairs (as > described in "the situation"). If anyone has any idea how to solve > this, that person would make my day so hard... Anyways, thanks for > your time! Best, Fridtjof > --------------030100010702060502020904 Content-Type: text/html; charset="utf-8" Content-Transfer-Encoding: 8bit
(tried to reformat)

Hi,

I have a problem which seems to be unsolvable in Flink at the moment (1.0-Snapshot, current master branch)
and I would kindly ask for some input, ideas on alternative approaches or just a confirmatory "yup, that doesn't work".

### Here's the situation:

I have a dataset and its elements are totally ascending sorted by some key (Int). Each element has a "next-pointer" to its successor, which is just another field with the key of the following element:

x0 -> x1 -> x2 -> x3 -> ... -> xn

The keys are not necessarily increasing by 1, so it may be that: x0 has key 2 and x1 has key 10, x2 has 11, x3 has 25 and so on. I need to process that set in the following way:

iterate:

find all pairs of elements where "next == key" BUT make sure no element appears in multiple pairs

example: do pair (x0, x1), (x2, x3), (x4, x5), ... but don't pair (x1, x2), (x3, x4), ...

then, if some condition is met, combine a pair

run above procedure again with switched pairing-condition:

example: do pair (x1, x2), (x3, x4), (x5, x6), ... do not pair (x0, x1), (x2, x3), ..

I hope the problem is clear...


### Now my approach: pseudo-scala-code:


val indexed = input.zipWithIndex

val flagged = indexed.map((i, el) => el.setFlag(i % 2 == 0))

val left = flagged.filter(el => el.flag)

val right = flagged.filter(el => !el.flag)

left.fullOuterJoin(right)

 .where(el.next)

 .equalTo(el.key)

 ...


I attach my elements with a temporary key, that is increasing by 1, with zipWithIndex. Then, I map that tempKey to a boolean joinFlag: true if key is even, false if key is odd. Then I filter all elements with true, and put them in a dataset that is the left side of the next == key join. The right side are all elements with flag == false In the second run, I switch the flag construction to el.setFlag(i % 2 != 0).

That actually works, there is only one problem:


### The problem:


In my approach, I must not loose the total ordering of the data, because only if that ordering is preserved, the assignment of alternating join-flags works. Initially it is done by range-partitioning and partition-sorting. However, that ordering is destroyed, when data is shuffled for the join. And I can not restore it, because I have to run the whole thing in an iteration, and range-partitioning is not supported within iterations.


### Help?

It sounds all very complicated, but the only thing I really have to solve is that join without any element appearing in multiple pairs (as described in "the situation"). If anyone has any idea how to solve this, that person would make my day so hard...

Anyways, thanks for your time!

Best, Fridtjof



Am 01.02.16 um 11:32 schrieb Fridtjof Sander:
Hi,

I have a problem which seems to be unsolvable in Flink at the moment (1.0-Snapshot, current master branch)
and I would kindly ask for some input, ideas on alternative approaches or just a confirmatory "yup, that doesn't work".

### Here's the situation:

I have a dataset and its elements are totally ascending sorted by some key (Int). Each element has a "next-pointer" to its successor, which is just another field with the key of the following element: x0 -> x1 -> x2 -> x3 -> ... -> xn The keys are not necessarily increasing by 1, so it may be that: x0 has key 2 and x1 has key 10, x2 has 11, x3 has 25 and so on. I need to process that set in the following way: iterate: find all pairs of elements where "next == key" BUT make sure no element appears in multiple pairs example: do pair (x0, x1), (x2, x3), (x4, x5), ... but don't pair (x1, x2), (x3, x4), ... then, if some condition is met, combine a pair run above procedure again with switched pairing-condition: example: do pair (x1, x2), (x3, x4), (x5, x6), ... do not pair (x0, x1), (x2, x3), .. I hope the problem is clear... ### Now my approach: pseudo-scala-code:

val indexed = input.zipWithIndex val flagged = indexed.map((i, el) => el.setFlag(i % 2 == 0)) val left = flagged.filter(el => el.flag)
val right = flagged.filter(el => !el.flag) left.fullOuterJoin(right) .where(el.next) .equalTo(el.key) ... I attach my elements with a temporary key, that is increasing by 1, with zipWithIndex. Then, I map that tempKey to a boolean joinFlag: true if key is even, false if key is odd. Then I filter all elements with true, and put them in a dataset that is the left side of the next == key join. The right side are all elements with flag == false In the second run, I switch the flag construction to el.setFlag(i % 2 != 0). That actually works, there is only one problem: ### The problem: In my approach, I must not loose the total ordering of the data, because only if that ordering is preserved, the assignment of alternating join-flags works. Initially it is done by range-partitioning and partition-sorting. However, that ordering is destroyed, when data is shuffled for the join. And I can not restore it, because I have to run the whole thing in an iteration, and range-partitioning is not supported within iterations. ### Help? It sounds all very complicated, but the only thing I really have to solve is that join without any element appearing in multiple pairs (as described in "the situation"). If anyone has any idea how to solve this, that person would make my day so hard... Anyways, thanks for your time! Best, Fridtjof


--------------030100010702060502020904--