Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1A975200C62 for ; Wed, 26 Apr 2017 14:11:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 19018160BA8; Wed, 26 Apr 2017 12:11:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3911D160B95 for ; Wed, 26 Apr 2017 14:11:49 +0200 (CEST) Received: (qmail 33285 invoked by uid 500); 26 Apr 2017 12:11:48 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 33275 invoked by uid 99); 26 Apr 2017 12:11:48 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Apr 2017 12:11:48 +0000 Received: from aljoschas-mbp.fritz.box (ipservice-092-214-155-234.092.214.pools.vodafone-ip.de [92.214.155.234]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 4E3E31A0280; Wed, 26 Apr 2017 12:11:47 +0000 (UTC) From: Aljoscha Krettek Message-Id: <91A93556-CE03-45F9-8675-129196779CD2@apache.org> Content-Type: multipart/alternative; boundary="Apple-Mail=_AF6E847D-1112-455E-9087-05A999B75EC9" Mime-Version: 1.0 (Mac OS X Mail 10.3 \(3273\)) Subject: Re: Re-keying / sub-keying a stream without repartitioning Date: Wed, 26 Apr 2017 14:11:44 +0200 In-Reply-To: Cc: Elias Levy To: user@flink.apache.org References: X-Mailer: Apple Mail (2.3273) archived-at: Wed, 26 Apr 2017 12:11:50 -0000 --Apple-Mail=_AF6E847D-1112-455E-9087-05A999B75EC9 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Hi Elias, sorry for the delay, this must have fallen under the table after Flink = Forward. I did spend some time thinking about this and we had the idea for a = while now to add an operation like =E2=80=9CkeyByWithoutPartitioning()=E2=80= =9D (name not final ;-) that would allow the user to tell the system = that we don=E2=80=99t have to do a reshuffle. This would work if the = key-type (and keys) would stay exactly the same. I think it wouldn=E2=80=99t work for your case because the key type = changes and elements for key (A, B) would normally be reshuffled to = different instances than with key (A), i.e. (1, 1) does not belong to = the same key-group as (1). Would you agree that this happens in your = case? Best, Aljoscha=20 > On 25. Apr 2017, at 23:32, Elias Levy = wrote: >=20 > Anyone? >=20 > On Fri, Apr 21, 2017 at 10:15 PM, Elias Levy = > = wrote: > This is something that has come up before on the list, but in a = different context. I have a need to rekey a stream but would prefer the = stream to not be repartitioned. There is no gain to repartitioning, as = the new partition key is a composite of the stream key, going from a key = of A to a key of (A, B), so all values for the resulting streams are = already being rerouted to the same node and repartitioning them to other = nodes would simply generate unnecessary network traffic and serde = overhead. >=20 > Unlike previous use cases, I am not trying to perform aggregate = operations. Instead I am executing CEP patterns. Some patterns apply = the the stream keyed by A and some on the stream keyed by (A,B). >=20 > The API does not appear to have an obvious solution to this situation. = keyBy() will repartition and there is isn't something like subKey() to = subpartion a stream without repartitioning (e.g. keyBy(A).subKey(B)). >=20 > I suppose I could accomplish it by using partitionCustom(), ignoring = the second element in the key, and delegating to the default partitioner = passing it only the first element, thus resulting in no change of task = assignment. >=20 > Thoughts? >=20 --Apple-Mail=_AF6E847D-1112-455E-9087-05A999B75EC9 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 Hi Elias,
sorry for the delay, this must have = fallen under the table after Flink Forward.

I did spend some time thinking about = this and we had the idea for a while now to add an operation like = =E2=80=9CkeyByWithoutPartitioning()=E2=80=9D (name not final ;-) that = would allow the user to tell the system that we don=E2=80=99t have to do = a reshuffle. This would work if the key-type (and keys) would stay = exactly the same.

I think it wouldn=E2=80=99t work for your case because the = key type changes and elements for key (A, B) would normally be = reshuffled to different instances than with key (A), i.e. (1, 1) does = not belong to the same key-group as (1). Would you agree that this = happens in your case?

Best,
Aljoscha 

On 25. Apr 2017, at 23:32, Elias Levy <fearsome.lucidity@gmail.com> wrote:

Anyone?

On Fri, Apr 21, 2017 at 10:15 PM, Elias Levy <fearsome.lucidity@gmail.com> = wrote:
This is something that has come up before on the list, but in = a different context.  I have a need to rekey a stream but would = prefer the stream to not be repartitioned.  There is no gain to = repartitioning, as the new partition key is a composite of the stream = key, going from a key of A to a key of (A, B), so all values for the = resulting streams are already being rerouted to the same node and = repartitioning them to other nodes would simply generate unnecessary = network traffic and serde overhead.

Unlike previous use cases, I am not = trying to perform aggregate operations.  Instead I am executing CEP = patterns.  Some patterns apply the the stream keyed by A and some = on the stream keyed by (A,B).

The API does not appear to have an = obvious solution to this situation. keyBy() will repartition and there = is isn't something like subKey() to subpartion a stream without = repartitioning (e.g. keyBy(A).subKey(B)).

I suppose I could accomplish it by = using partitionCustom(), ignoring the second element in the key, and = delegating to the default partitioner passing it only the first element, = thus resulting in no change of task assignment.

Thoughts?


= --Apple-Mail=_AF6E847D-1112-455E-9087-05A999B75EC9--