Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C2772200D1B for ; Thu, 12 Oct 2017 11:31:55 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C115F1609E8; Thu, 12 Oct 2017 09:31:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 92D291609CD for ; Thu, 12 Oct 2017 11:31:54 +0200 (CEST) Received: (qmail 17839 invoked by uid 500); 12 Oct 2017 09:31:53 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 17829 invoked by uid 99); 12 Oct 2017 09:31:53 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Oct 2017 09:31:53 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id A4308C3D67 for ; Thu, 12 Oct 2017 09:31:52 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.479 X-Spam-Level: ** X-Spam-Status: No, score=2.479 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=data-artisans-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id lphm646gcALE for ; Thu, 12 Oct 2017 09:31:51 +0000 (UTC) Received: from mail-wm0-f54.google.com (mail-wm0-f54.google.com [74.125.82.54]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id A23BD5F3FE for ; Thu, 12 Oct 2017 09:31:50 +0000 (UTC) Received: by mail-wm0-f54.google.com with SMTP id m72so23161427wmc.0 for ; Thu, 12 Oct 2017 02:31:50 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=data-artisans-com.20150623.gappssmtp.com; s=20150623; h=from:message-id:mime-version:subject:date:in-reply-to:cc:to :references; bh=9f+mFtNx04AnZwiO0Of6d5XKpMTj1aZycijlz7xQ2w4=; b=XfPPS9JhEWKOPjNMm2lOhIh3yxNKwDhDvc2emmElbi3d/BfHpIbYJ7krlTupDB3x6+ mhcTwB+cFcs6bSuyJQn54xXi0CQFCPEJGuUyV9+uovKLJFMK1cqJdyWlJXTfRZRhm8I3 63eQN3al14Mw6b1PVFXyLIq6RMr+sqENPpEb0KJ76FgsXBzYwF0IV4kXOavZUf8SBN93 EbuP66QLiuDu69zfuCaiQO7WwEZXZmd6bfYuMZIqUSMP3sp4Yd2FA2G6lt1mK4ntZqD6 191syLE/ZnxHu4rqs03XKQU9NyS6FfeHZsw72WpFnEGDAIFVs+LgXLv2SZtvBs8jk2G3 pgiQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:message-id:mime-version:subject:date :in-reply-to:cc:to:references; bh=9f+mFtNx04AnZwiO0Of6d5XKpMTj1aZycijlz7xQ2w4=; b=HbxTp2K5I1MFlIf4ndcbADCfZtEdaBjNBMtX3H6B45Ocz7R1BkFJGSkqblOl/oeaYw InR/wq4SnkB4eTdtdFLw5Uk2+4g9/ILPU0uwCLfomQ8o0Zb40zkU/IjF+8bN/Hr//61g riRN4m8PbShXcfdtl4bnCPpXJOZ8GqgzLjfd9ui6Nbu2sTo1kBu8/Ajc0Ogb8Y9Y2I4z uWRNkXoR9qSseSE5QC0Z8dWCquk4c70eba1pZd0+d6B0SHUB1Sy1yMP4In+JGWkiWGt7 2v8Fz+xWeJrylx1KDk2JzN7K+VdBkDmLuv9/kIhlp5EFkCtaVHMwZXe5NYd7/Y25uNSV Ll5Q== X-Gm-Message-State: AMCzsaViMRidgUvad9c4etDexhIGmPEbADYYr/wsObu/j9g+OvAhiiy0 EWoO2g1eDccUEwPWxrBo9Z3/MQ== X-Google-Smtp-Source: AOwi7QDaZsKXvPhTDZ9zFIadfxlhIlpnduIPlNhRCQcYWfNBeB6KGeMHUh8G3UUB7SC/SHpsfH+0rw== X-Received: by 10.223.195.147 with SMTP id p19mr1467497wrf.176.1507800709504; Thu, 12 Oct 2017 02:31:49 -0700 (PDT) Received: from piotrs-mbp.fritz.box (ip-2-205-81-99.web.vodafone.de. [2.205.81.99]) by smtp.gmail.com with ESMTPSA id o20sm17550939wro.71.2017.10.12.02.31.48 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Thu, 12 Oct 2017 02:31:49 -0700 (PDT) From: Piotr Nowojski Message-Id: Content-Type: multipart/alternative; boundary="Apple-Mail=_4E39A8D1-ED8F-4C40-AF17-0132A26FE421" Mime-Version: 1.0 (Mac OS X Mail 10.3 \(3273\)) Subject: Re: Implement bunch of transformations applied to same source stream in Apache Flink in parallel and combine result Date: Thu, 12 Oct 2017 11:31:48 +0200 In-Reply-To: Cc: user@flink.apache.org To: Andrey Salnikov References: X-Mailer: Apple Mail (2.3273) archived-at: Thu, 12 Oct 2017 09:31:55 -0000 --Apple-Mail=_4E39A8D1-ED8F-4C40-AF17-0132A26FE421 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Hi, What is the number of events per second that you wish to process? If = it=E2=80=99s high enough (~ number of machines * number of cores) you = should be just fine, instead of scaling with number of features, scale = with number of events. If you have a single data source you still could = randomly shuffle events before applying your transformations.=20 Another solution might be to: 1. Assign unique eventId and split the original event using flatMap into = tuples: 2. keyBy featureId, eventId (or maybe do random partitioning with = shuffle?) 3. perform transformation 4. keyBy eventId, =E2=80=A6. 5. Window and reduce But that would add more overhead compared to processing more events at = the same time. Piotrek > On 11 Oct 2017, at 23:02, Andrey Salnikov wrote: >=20 > Hi! >=20 > Could you please help me - I'm trying to use Apache Flink for machine = learning tasks with external ensemble/tree libs like XGBoost, so my = workflow will be like this: >=20 > receive single stream of data which atomic event looks like a simple = vector event=3D(X1, X2, X3...Xn) and it can be imagined as POJO fields = so initially we have DataStream source=3D... > a lot of feature extractions code applied to the same event source: = feature1 =3D source.map(X1...Xn) feature2 =3D source.map(X1...Xn) etc. = For simplicity lets DataStream feature(i) =3D source.map() for all = features > then I need to create a vector with extracted features (feature1, = feature2, ...featureK) for now it will be 40-50 features, but I'm sure = it will contain more items in future and easily can contains 100-500 = features and more > put these extracted features to dataset/table columns by 10 minutes = window and run final machine learning task on such 10 minutes data > In simple words I need to apply several quite different map operations = to the same single event in stream and then combine result from all map = functions in single vector. >=20 > So for now I can't figure out how to implement final reduce step and = run all feature extraction mapjobs in parallel if possible. I spend = several days on flink docs site, youtube videos, googling, reading = Flink's sources but it seems I'm really stuck here. >=20 > The easy solution here will be to use single map operation and run = each feature extraction code sequentially one by one in huge map body, = and then return final vector (Feature1...FeatureK) for each input event. = But it should be crazy and non optimal. >=20 > Another solution for each two pair of features use join since all = feature DataStreams has same initial event and same key and only apply = some transformation code, but it looks ugly: write 50 joins code with = some window. And I think that joins and cogroups developed for joining = different streams from different sources and not for such map/reduce = operations. >=20 > As for me for all map operations here should be a something simple = which I'm missing. >=20 > Could you please point me how you guys implement such tasks in Flink, = and if possible with example of code? >=20 > PS: I posted this question = to stackoverflow. > PPS: If I will use feature1.union(feature2...featureK) I still need = somehow separate and combine features vector before sink, and preserve = order of final vectors. >=20 > Th=E2=80=8B=E2=80=8Banks, > Andrey --Apple-Mail=_4E39A8D1-ED8F-4C40-AF17-0132A26FE421 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 Hi,

What = is the number of events per second that you wish to process? If it=E2=80=99= s high enough (~ number of machines * number of cores) you should be = just fine, instead of scaling with number of features, scale with number = of events. If you have a single data source you still could randomly = shuffle events before applying your transformations. 

Another solution might = be to:
1. Assign unique eventId and split the = original event using flatMap into tuples: <featureId, Xi, = eventId>
2. keyBy featureId, eventId (or maybe = do random partitioning with shuffle?)
3. perform = transformation
4. keyBy eventId, =E2=80=A6.
5. Window and reduce

But that would add more overhead = compared to processing more events at the same time.

Piotrek

On 11 Oct 2017, at 23:02, Andrey Salnikov <quixite@gmail.com> = wrote:

Hi!

Could you please help me - I'm trying to = use Apache = Flink for machine learning tasks with external ensemble/tree = libs like XGBoost, so my workflow will be like this:

  • receive single stream of data which atomic event looks like a = simple vector event=3D(X1, X2, = X3...Xn) and it can be imagined as POJO fields so initially = we have DataStream<event> = source=3D...
  • a lot of feature extractions code applied to the same event = source: feature1 =3D = source.map(X1...Xn) feature2 =3D = source.map(X1...Xn) etc. For simplicity lets DataStream<int> = feature(i) =3D source.map() for all features
  • then I need to create a vector with extracted = features (feature1, feature2, = ...featureK) for now it will be 40-50 features, but I'm sure = it will contain more items in future and easily can contains 100-500 = features and more
  • put these extracted features to dataset/table columns = by 10 minutes window and run final machine learning task on such 10 = minutes data

In simple words I need to apply several quite = different map operations= to the same single event in stream and then combine result from all map = functions in single vector.

So for now I can't figure out how to implement = final reduce step and run all feature extraction mapjobs = in parallel if = possible. I spend several days on flink docs site, youtube videos, = googling, reading Flink's sources but it seems I'm really stuck = here.

The easy solution here will be to use = single map operation = and run each feature extraction code sequentially one by one in = huge map body, = and then return final vector (Feature1...FeatureK) for each = input event. But it should be crazy and non optimal.

Another solution for each two pair of features = use join since = all feature DataStreams has same initial event and same key and only = apply some transformation code, but it looks ugly: write 50 joins code = with some window. And I = think that joins and cogroups developed for joining different streams = from different sources and not for such map/reduce operations.

As for me for all map operations= here should be a something simple which I'm missing.

Could you please point me how you guys = implement such tasks in Flink, and if = possible with example of code?

PS: = I posted this question to = stackoverflow.
PPS: If I will = use feature1.union(feature2...featureK) I still need somehow separate and = combine features vector before sink, and preserve order of final = vectors.

Th
=E2= =80=8B=E2=80=8B
anks,
Andrey

= --Apple-Mail=_4E39A8D1-ED8F-4C40-AF17-0132A26FE421--