Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2310719FD9 for ; Thu, 14 Apr 2016 01:11:11 +0000 (UTC) Received: (qmail 48747 invoked by uid 500); 14 Apr 2016 01:11:10 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 48654 invoked by uid 500); 14 Apr 2016 01:11:10 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 48644 invoked by uid 99); 14 Apr 2016 01:11:10 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Apr 2016 01:11:10 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 49765C064A for ; Thu, 14 Apr 2016 01:11:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.179 X-Spam-Level: * X-Spam-Status: No, score=1.179 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 4eOahLCXytZI for ; Thu, 14 Apr 2016 01:11:09 +0000 (UTC) Received: from mail-lf0-f45.google.com (mail-lf0-f45.google.com [209.85.215.45]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with ESMTPS id 96D115F20E for ; Thu, 14 Apr 2016 01:11:08 +0000 (UTC) Received: by mail-lf0-f45.google.com with SMTP id c126so91419975lfb.2 for ; Wed, 13 Apr 2016 18:11:08 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=d5BbHnGZPKK7CaN2oB4d1Q93XbRlnihLWAOIlqq5wCA=; b=eBZF4Ct5ilHrsEQ84g7iucgovJD2WNEKksYWv7Qfrn9mvfoV+h8W+MgyQJ0ztJz/v1 HMg9yczafFpVcrhOh2BTDAZxPm+2Ssq8XY+Xd6k3AFlDrhhEAdNF3kGg+gy+cOj7PFAv 1AZvH0q5nm3Yqu/jAsPa3vX3yDkmtbv9yKYjsC3m/keyTrPSBmdy3c6XRFjXxJw/fp+F 4zYytDvWb5se4J/hxnZmfSMWmOTcE2b3zkp3hThLV0pU/RKITtccxMUzIkqNdSCJjAFZ 7tydYLDTvnNF84T4V8N233631G32ymWLa6SRxzCaBqNaC9gx79U8qa1h6qaE3mMtHsH5 RJiA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=d5BbHnGZPKK7CaN2oB4d1Q93XbRlnihLWAOIlqq5wCA=; b=aQj4K5PtjTLNL9zHRI8iYDsru037NFGAJLJnrBkIqPklYqwKUa9gnx4blrB52vdAO7 44bi7w9rAHh4Wz5/Z+8/HPTsgzpgkYh9xZSNTMR2DzscC7mW7BemXDUyn6+abaA8aR1N FBLg9gmuEXmlN+zY14sWKHP9NMy5/NrDo80zDLBfzDebF4cqCxTlJerPxztfjqxdOzYq jaz4I/r1ZbjyICqpOQ+t+nybhyVe1V5udsGVpvV7WIvCLJpid97PwZJYoX2vvinyb0aH dfiuu2Sn51+4wEh+vNQ82uQl4QK7sDRLCS1vzHPwL3vJfE0LI8W5GSH+69SVqxzjn9gA INVg== X-Gm-Message-State: AOPr4FW/91qjjEAtd3oLPJ3kposBNjFFCEKRNuA5uP+ddowAQiHkFN6JUeQkoSv5BjCKbQ6EAS4weqUoLTwVuw== X-Received: by 10.112.84.105 with SMTP id x9mr5575725lby.23.1460596268091; Wed, 13 Apr 2016 18:11:08 -0700 (PDT) MIME-Version: 1.0 Received: by 10.25.76.196 with HTTP; Wed, 13 Apr 2016 18:10:48 -0700 (PDT) In-Reply-To: References: From: Elias Levy Date: Wed, 13 Apr 2016 18:10:48 -0700 Message-ID: Subject: Re: Does Kafka connector leverage Kafka message keys? To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a1134d33cb8d07d05306791e8 --001a1134d33cb8d07d05306791e8 Content-Type: text/plain; charset=UTF-8 On Wed, Apr 13, 2016 at 2:10 AM, Stephan Ewen wrote: > If you want to use Flink's internal key/value state, however, you need to > let Flink re-partition the data by using "keyBy()". That is because Flink's > internal sharding of state (including the re-sharding to adjust parallelism > we are currently working on) follows a dedicated hashing scheme which is > with all likelihood different from the partition function that writes the > key/value pairs to the Kafka Topics. > That is interesting, if somewhat disappointing. I was hoping that performing a keyBy from a Kafka source would perform no reshuffling if you used the same value as you used for the Kafka message key. But it makes sense if you are using different hash functions. It may be useful to have a variant of keyBy() that converts the stream to a KeyedStream but performs no shuffling if the caller is certain that the DataStream is already partitioned by the given key. --001a1134d33cb8d07d05306791e8 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
On Wed, Apr 13, 2016 at 2:10 AM, Stephan Ewen <sewen@apach= e.org> wrote:
If you want = to use Flink's internal key/value state, however, you need to let Flink= re-partition the data by using "keyBy()". That is because Flink&= #39;s internal sharding of state (including the re-sharding to adjust paral= lelism we are currently working on) follows a dedicated hashing scheme whic= h is with all likelihood different from the partition function that writes = the key/value pairs to the Kafka Topics.
<= br>
That is interesting, if somewhat disappointing.=C2=A0 I was h= oping that performing a keyBy from a Kafka source would perform no reshuffl= ing if you used the same value as you used for the Kafka message key.=C2=A0= But it makes sense if you are using different hash functions.
It may be useful to have a variant of keyBy() that converts th= e stream to a KeyedStream but performs no shuffling if the caller is certai= n that the DataStream is already partitioned by the given key.

--001a1134d33cb8d07d05306791e8--