Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7BDB9200BA3 for ; Thu, 20 Oct 2016 09:58:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7A685160AE0; Thu, 20 Oct 2016 07:58:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BF73A160ADB for ; Thu, 20 Oct 2016 09:58:41 +0200 (CEST) Received: (qmail 93153 invoked by uid 500); 20 Oct 2016 07:58:40 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 93137 invoked by uid 99); 20 Oct 2016 07:58:40 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Oct 2016 07:58:40 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 0CEACC0E19 for ; Thu, 20 Oct 2016 07:58:40 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.379 X-Spam-Level: ** X-Spam-Status: No, score=2.379 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id zohQDNfChQd4 for ; Thu, 20 Oct 2016 07:58:37 +0000 (UTC) Received: from mail-vk0-f53.google.com (mail-vk0-f53.google.com [209.85.213.53]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 994EB5FB4B for ; Thu, 20 Oct 2016 07:58:37 +0000 (UTC) Received: by mail-vk0-f53.google.com with SMTP id b186so60866079vkb.1 for ; Thu, 20 Oct 2016 00:58:37 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:from:date:message-id:subject:to; bh=m0vgjU0UCEGqeLUB9jRHNPhZ8mXTovDOoCrUClmpLpM=; b=BWZNcjQjp8hTmw4ZsvGJrEjS57ZL/o69nEiU4iuNjmCEAtM30isD4Bqp9LzB1QPFIb 3+bbFYoN+9oBZ3/y0Lv+j3fcsbsnoHPDIjGYXn/plcSCU5vzX1hCkM6hGKPLAmPh8F3X 5GiXUNXqMjSVBW1jtJ1DoOcT1VcXO82+eBV2ePC/WRdQkWHTT3zpZW0P+zz78UGputTM I82GrRXD2GDBAVHKhLlrnKPI6vCH4RlIKNELT53215PQOyPpup/x3SxAv6o5l/PSwCcV BOoQzgPka5sl0mIO8XlqPS1GktnrQWv7Iw+FcTRcR/m5dL1kBm8qqfVOrNsXr1CiamjP 3hpA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=m0vgjU0UCEGqeLUB9jRHNPhZ8mXTovDOoCrUClmpLpM=; b=Ejf5Ujn4gmndSWWj0X/pF5bnL3DTnZLVl/FklLZ4uSD11413mI7ZdV5Nmp30+V2R4P N8pqkH6O8V6mBiXWsPBjPcgiKQ7FgYHbM/EpFbbufzxxfSWa1Sq04STCdE9MaQO+4HUJ XIY/Q7dhspet0sXHWeXQHml17tmUscwglmJTPCjZFWGnU/jAgzowf5RMdXVqELnnoeY2 1R9L+1qMZeHraPKyiSOM8LrhgPWN0YuLbiqCyRz5HgFZVOWe8V+bh3D4m/aFA9TI8Omk UCOkPfrHOlVOJaY8Egq90T1s4GRJxEnm3fqdzdIOYc4ugZvN+BnhVHynTLkeNa78rlwC NeRQ== X-Gm-Message-State: AA6/9RmUtFIQB/JdYTRwNj1B/+OHRshyp+irIHkOlFT0p9ZRdLwPYTEnbekt7t+T/xdCpvn8kkn3piXVpU1J6A== X-Received: by 10.31.11.145 with SMTP id 139mr6435774vkl.141.1476950011115; Thu, 20 Oct 2016 00:53:31 -0700 (PDT) MIME-Version: 1.0 Received: by 10.103.48.2 with HTTP; Thu, 20 Oct 2016 00:53:30 -0700 (PDT) From: Xiaowei Jiang Date: Thu, 20 Oct 2016 15:53:30 +0800 Message-ID: Subject: Add partitionedKeyBy to DataStream To: dev@flink.apache.org Content-Type: multipart/alternative; boundary=001a11410bfcc3886c053f473870 archived-at: Thu, 20 Oct 2016 07:58:42 -0000 --001a11410bfcc3886c053f473870 Content-Type: text/plain; charset=UTF-8 After we do any interesting operations (e.g. reduce) on KeyedStream, the result becomes DataStream. In a lot of cases, the output still has the same or compatible keys with the KeyedStream (logically). But to do further operations on these keys, we are forced to use keyby again. This works semantically, but is costly in two aspects. First, it destroys the possibility of chaining, which is one of the most important optimization technique. Second, keyby will greatly expand the connected components of tasks, which has implications in failover optimization. To address this shortcoming, we propose a new operator partitionedKeyBy. DataStream { public KeyedStream partitionedKeyBy(KeySelector key) } Semantically, DataStream.partitionedKeyBy(key) is equivalent to DataStream.keyBy(partitionedKey) where partitionedKey is key plus the taskid as an extra field. This guarantees that records from different tasks will never produce the same keys. With this, it's possible to do ds.keyBy(key1).reduce(func1) .partitionedKeyBy(key1).reduce(func2) .partitionedKeyBy(key2).reduce(func3); Most importantly, in certain cases, we will be able to chains these into a single vertex. Please share your thoughts. The JIRA is at https://issues.apache.org/j ira/browse/FLINK-4855 Xiaowei --001a11410bfcc3886c053f473870--