Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8C3BE200BC3 for ; Fri, 18 Nov 2016 09:41:23 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8AC58160B04; Fri, 18 Nov 2016 08:41:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AD78C160AFE for ; Fri, 18 Nov 2016 09:41:22 +0100 (CET) Received: (qmail 18675 invoked by uid 500); 18 Nov 2016 08:41:16 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 18661 invoked by uid 99); 18 Nov 2016 08:41:16 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Nov 2016 08:41:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id E976BC0D64 for ; Fri, 18 Nov 2016 08:41:15 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.379 X-Spam-Level: ** X-Spam-Status: No, score=2.379 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id jQOXFwwpQD0w for ; Fri, 18 Nov 2016 08:41:11 +0000 (UTC) Received: from mail-wm0-f41.google.com (mail-wm0-f41.google.com [74.125.82.41]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id BD6795F39B for ; Fri, 18 Nov 2016 08:41:10 +0000 (UTC) Received: by mail-wm0-f41.google.com with SMTP id f82so22548070wmf.1 for ; Fri, 18 Nov 2016 00:41:10 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=3tdjtgKB2eEIxZKluN1WImmk3TZHF41Il+LrL+IVm6I=; b=aUNyszWHNhBYTh3p5Y8m2P60XM3okAYABQAdI/nz6E7EQ3shGqGDi0zNOxeih4AQyA ngX/eQ61jOis489b5NkzW4FPK5lLWovAiRtq1pgyzDgkyZ5kjvB9vr+qZB9PE1NgHWOl Pp2BajFpZ4YYWDsGw/z1LVv5n2VMvPInwy0vW2JYaC5rZgJ2/SmtunN1wxmS3Xtp1Rz8 oPDIMknO3lfMhERyocoJvyBIVszvFH9t4r39ghJkmrOZMhoQY46O1Ysa6g/dqeA1U6Lt BlfJ7okcnx3EJ9TO8IYLjinUBI/TyKT3Q/eUvGjfx/+92V60koagFRulOM8f3zY4AN80 YaoA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=3tdjtgKB2eEIxZKluN1WImmk3TZHF41Il+LrL+IVm6I=; b=UlJtenDqtT6SQekLuoPg957hH8toI1b8HJoZzphmi6X0B2OtqWvU7o8ioZgh03kJFQ SJki4xatpL/Uxcrd0menkyDTLmEz28GuuCdZhJkbvzqCXJJalexAEtTwTSrqGMbfRJw8 jrp6OR5kV70CGVRtO/vmv6Q+pXtygAIZtt8HkX7b4zPcmVA+FW1FUXD1/kpb3ZcKq45w yC3EsFoXnZYs7iYiNB44tLI5guJwxW/Q7SGTlM2WvuBegvqh3QLVhfnGXSqGcTVMBFhW bBv8j8Vmnrnwdpwur7tG19+G9UEuRzQuKybx0SYCp2owiUUnt8G4yJWdU6q4ncJHdISL 8aKA== X-Gm-Message-State: AKaTC03Su+/fUVC4gRuvAYTA8D9Mio/D3D3yOxbvnHtiiZbFHX7acnT8Kz1Iu2vJITbjVowwk9SdpxJtuyit/w== X-Received: by 10.46.32.203 with SMTP id g72mr3740960lji.24.1479458466868; Fri, 18 Nov 2016 00:41:06 -0800 (PST) MIME-Version: 1.0 Received: by 10.25.137.4 with HTTP; Fri, 18 Nov 2016 00:40:36 -0800 (PST) In-Reply-To: References: From: Fabian Hueske Date: Fri, 18 Nov 2016 09:40:36 +0100 Message-ID: Subject: Re: Cross product of datastream and dataset To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a1142b65260b0d105418f4429 archived-at: Fri, 18 Nov 2016 08:41:23 -0000 --001a1142b65260b0d105418f4429 Content-Type: text/plain; charset=UTF-8 Hi, it is not possible to mix the DataSet and DataStream APIs at the moment. If the DataSet is constant and not too big (which I assume, since otherwise crossing would be extremely expensive), you can load the data into a stateful MapFunction. For that you can implement a RichFlatMapFunction and read the data in open(). For each incoming record, i.e., each call of map(), you cross it with the records in the state and immediately evaluate the condition and count. That way you don't generate too many records. If your DataSet is slowly changing, you can think of using a stateful CoFlatmapFunction and use on input to read the stream and the other to update the dataset. Hope this helps, Fabian 2016-11-17 22:36 GMT+01:00 Charlie Moad : > We're having trouble mapping our problem to Flink. > > - For each incoming item > - Generate tuples of the item crossed with a data set > - Filter the tuples based on a condition > - Know the count of matching tuples > > This seems to be mashup of DataStream and DataSet, but it appears you > can't operate with those together. We are also wondering if kicking off a > batch job for each incoming item is a feasible approach. We don't have time > window constraints. > > Any recommendations would be greatly appreciated. > > - Charlie > > --001a1142b65260b0d105418f4429 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,

it is = not possible to mix the DataSet and DataStream APIs at the moment.

If the DataSet is constant and not too big (which I assume, sin= ce otherwise crossing would be extremely expensive), you can load the data = into a stateful MapFunction.
For that you can implement a RichFlat= MapFunction and read the data in open(). For each incoming record, i.e., ea= ch call of map(), you cross it with the records in the state and immediatel= y evaluate the condition and count. That way you don't generate too man= y records.

If your DataSet is slowly changing, you can think o= f using a stateful CoFlatmapFunction and use on input to read the stream an= d the other to update the dataset.

Hope this helps,
F= abian


2016-11-17 22:36 GMT+01:00 = Charlie Moad <charlie.moad@geofeedia.com>:
We're having trouble mapping= our problem to Flink.

- For each incoming item
- Generate tuples of the item crossed with a data set
- Filter = the tuples based on a condition
- Know the count of matching tupl= es

This seems to be mashup of DataStream and DataS= et, but it appears you can't operate with those together. We are also w= ondering if kicking off a batch job for each incoming item is a feasible ap= proach. We don't have time window constraints.

Any recommendations would be greatly appreciated.

- Charlie


--001a1142b65260b0d105418f4429--