Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 60F13179F2 for ; Thu, 6 Nov 2014 09:18:40 +0000 (UTC) Received: (qmail 33784 invoked by uid 500); 6 Nov 2014 09:18:40 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 33718 invoked by uid 500); 6 Nov 2014 09:18:40 -0000 Mailing-List: contact user-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.incubator.apache.org Delivered-To: mailing list user@flink.incubator.apache.org Received: (qmail 33709 invoked by uid 99); 6 Nov 2014 09:18:40 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Nov 2014 09:18:40 +0000 X-ASF-Spam-Status: No, hits=-2000.6 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 06 Nov 2014 09:18:38 +0000 Received: (qmail 33350 invoked by uid 99); 6 Nov 2014 09:18:18 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Nov 2014 09:18:18 +0000 Received: from mail-wi0-f174.google.com (mail-wi0-f174.google.com [209.85.212.174]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 11C451A02B8 for ; Thu, 6 Nov 2014 09:17:30 +0000 (UTC) Received: by mail-wi0-f174.google.com with SMTP id d1so823750wiv.13 for ; Thu, 06 Nov 2014 01:18:15 -0800 (PST) MIME-Version: 1.0 X-Received: by 10.194.59.81 with SMTP id x17mr3400142wjq.91.1415265495818; Thu, 06 Nov 2014 01:18:15 -0800 (PST) Received: by 10.27.175.34 with HTTP; Thu, 6 Nov 2014 01:18:15 -0800 (PST) In-Reply-To: References: Date: Thu, 6 Nov 2014 10:18:15 +0100 Message-ID: Subject: Re: PartitionByHash and usage of KeySelector From: Aljoscha Krettek To: user@flink.incubator.apache.org Content-Type: text/plain; charset=UTF-8 X-Virus-Checked: Checked by ClamAV on apache.org Hi Stefano, what to you mean by "creating partitions of each of the partitions"? The expressions keys can be used to specify fields of objects that should be used for hashing. So if you had objects of this class in a DataSet: class Foo { public String bar; public Integer baz; public Float bat; } You could use for example: input.partitionByHash("baz", "bat") to perform the partitioning only on those two fields of the objects. Regards, Aljoscha On Thu, Nov 6, 2014 at 9:19 AM, Stefano Bortoli wrote: > Hi all, > > I am moving my first steps into becoming an Apache Flink user! I have > configured and run some simple jobs on a small cluster, and everything > worked quite fine so far. > > What I am trying to do right now is to run a duplication detection task on > dataset of about 9.5M records. The records are well structured, and > therefore we can exploit the semantic of attributes to narrow down expensive > match executions. > > My idea is the following: > 1. partition the dataset according to a macro-parameter written in the > record. This allows me to get to 7 partitions of different sizes but also > certainly disjoint. I do that by filtering on a specific type. > 2. create partitions of each of the partitions created in step 1 based on > some simple similarity that would reduce the number of expensive function. I > would like to do that by using partitionByHash and KeySelector. > 3. compute Cross product for each of the partitions defined in step 2; > 4. filter each pair of the cross product by applying an expensive boolean > matching function. Only positive matching duplicates will be retained. > > Currently I am working on the step 2, and I have some problems understanding > how to use the partitionByHash function. The main problem is that I need to > have a 'rich key' to support partition, and I discovered the ExpressionKeys > that would allow me to define hash keys with sets of Strings I can collect > from the record. However, the partitionByHash function does not allow to use > these objects as the hash must implement comparable. > > So, here is my question: how can I partition considering hash keys of more > than one String? > > Is there a better strategy to implement a de-duplication using Flink? > > > thanks a lot for your support. > > kind regards, > > Stefano Bortoli, PhD > ENS Technical Director > _______________________________________________ > OKKAMSrl - www.okkam.it > > Email: bortoli@okkam.it > > Phone nr: +39 0461 1823912 > > Headquarters: Trento (Italy), Via Trener 8 > Registered office: Trento (Italy), via Segantini 23 > > Confidentially notice. This e-mail transmission may contain legally > privileged and/or confidential information. Please do not read it if you are > not the intended recipient(S). Any use, distribution, reproduction or > disclosure by any other person is strictly prohibited. If you have received > this e-mail in error, please notify the sender and destroy the original > transmission and its attachments without reading or saving it in any manner.