Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BA3C8200BB3 for ; Wed, 19 Oct 2016 03:47:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B8D54160AE5; Wed, 19 Oct 2016 01:47:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0DB23160AFB for ; Wed, 19 Oct 2016 03:47:00 +0200 (CEST) Received: (qmail 46907 invoked by uid 500); 19 Oct 2016 01:47:00 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 46881 invoked by uid 99); 19 Oct 2016 01:47:00 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Oct 2016 01:47:00 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id CFD7A2C4C70 for ; Wed, 19 Oct 2016 01:46:59 +0000 (UTC) Date: Wed, 19 Oct 2016 01:46:59 +0000 (UTC) From: "Xiaowei Jiang (JIRA)" To: dev@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Created] (FLINK-4855) Add partitionedKeyBy to DataStream MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 19 Oct 2016 01:47:01 -0000 Xiaowei Jiang created FLINK-4855: ------------------------------------ Summary: Add partitionedKeyBy to DataStream Key: FLINK-4855 URL: https://issues.apache.org/jira/browse/FLINK-4855 Project: Flink Issue Type: Improvement Components: DataStream API Reporter: Xiaowei Jiang Assignee: MaGuowei After we do any interesting operations (e.g. reduce) on KeyedStream, the result becomes DataStream. In a lot of cases, the output still has the same or compatible keys with the KeyedStream (logically). But to do further operations on these keys, we are forced to use keyby again. This works semantically, but is costly in two aspects. First, it destroys the possibility of chaining, which is one of the most important optimization technique. Second, keyby will greatly expand the connected components of tasks, which has implications in failover optimization. To address this shortcoming, we propose a new operator partitionedKeyBy. DataStream { public KeyedStream partitionedKeyBy(KeySelector key) } Semantically, DataStream.partitionedKeyBy(key) is equivalent to DataStream.keyBy(partitionedKey) where partitionedKey is key plus the taskid as an extra field. This guarantees that records from different tasks will never produce the same keys. With this, it's possible to do ds.keyBy(key1).reduce(func1) .partitionedKeyBy(key1).reduce(func2) .partitionedKeyBy(key2).reduce(func3); Most importantly, in certain cases, we will be able to chains these into a single vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)