Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 562D2200BF2 for ; Mon, 2 Jan 2017 10:25:40 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 5486C160B30; Mon, 2 Jan 2017 09:25:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7981A160B21 for ; Mon, 2 Jan 2017 10:25:39 +0100 (CET) Received: (qmail 99000 invoked by uid 500); 2 Jan 2017 09:25:38 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 98988 invoked by uid 99); 2 Jan 2017 09:25:38 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Jan 2017 09:25:38 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 0F00D1A068F for ; Mon, 2 Jan 2017 09:25:38 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.879 X-Spam-Level: * X-Spam-Status: No, score=1.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id hjhrDlcipr2G for ; Mon, 2 Jan 2017 09:25:36 +0000 (UTC) Received: from mail-vk0-f51.google.com (mail-vk0-f51.google.com [209.85.213.51]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 7F6725F33E for ; Mon, 2 Jan 2017 09:25:36 +0000 (UTC) Received: by mail-vk0-f51.google.com with SMTP id p9so262107912vkd.3 for ; Mon, 02 Jan 2017 01:25:36 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:from:date:message-id:subject:to; bh=INHSaufcG2t3zFAXgtwBkkBv0Jrk+MPivczQFw89Pmo=; b=qQVpfsv24jvGclfBrp0z7dGdKkQQdQ0lz+NaFyr/CCHoYSCjiWAFfQoV518bHAxonW W6esnOYZorJu13uArHAYkowbvTNYqt0ptBa0FoR5NYxvAWMJuUhJGnrAup28USBNXR1j ED0TX2slJjswiwdWbrnuwQW9SWR5OyNZ6btZVl9I+COFFPOU9mtKs6GcB+TWJdIEo4P4 Shq+s09FTYBFmkEghhr7yxd7aA0HlEN7/R6rVmfNHtW9TKXg7W4IO6REQjnYv6FjKWOb 6sDR0cRTl8cv/mXSTbHqm5gOvCw3FttSYF9GECdum+E7XjYlobDs4wuY4tJosSziwX+t qTew== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=INHSaufcG2t3zFAXgtwBkkBv0Jrk+MPivczQFw89Pmo=; b=oyEjCDG43/pZs5OnIBomKnhr8Nz+HvCFK11HxGTdrNqwd1OEi9wYFI8n2BU31/9dz1 4V1DSOqblUtqMvz0ysqhg8Cd2bYyXLMYxk5NfzwainJqMpioIUQnHvBGBaxFDbD1Nd+j n6t5nqLAEpBGJeLmz1ZrLIfU/s3lP4DtcXmig7PrifKNTLHJbv3iG3Sc9njSaKTiUS10 Qa813Hj7rfxHvx8xKF+bmWJlCnOxqJgkfacYxMeig3fiu4eYQ70SX7+DnMdk7OSApVPb PjQuCWWZS0+hEq2JHqW6nFaxifPAYrfrF/Ocx/FcOnNCF8ZDA8f4j6/XCheW3V7zbaCI ZkZA== X-Gm-Message-State: AIkVDXJKFTTa3i/iCEcQHdfTphHPh8ufFmarGO0fT2ND18AtNPe+4Jlf8sZfggaX6k/kZLy7KvLJGPbuhrkSbw== X-Received: by 10.31.180.204 with SMTP id d195mr16846800vkf.112.1483349135804; Mon, 02 Jan 2017 01:25:35 -0800 (PST) MIME-Version: 1.0 Received: by 10.31.63.23 with HTTP; Mon, 2 Jan 2017 01:25:35 -0800 (PST) From: Henri Heiskanen Date: Mon, 2 Jan 2017 11:25:35 +0200 Message-ID: Subject: Flink streaming questions To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11440aec5150b80545192218 archived-at: Mon, 02 Jan 2017 09:25:40 -0000 --001a11440aec5150b80545192218 Content-Type: text/plain; charset=UTF-8 Hi, I have few questions related to Flink streaming. I am on 1.2-SNAPSHOT and what I would like to accomplish is to have a stream that reads data from multiple kafka topics, identifies user sessions, uses an external user user profile to enrich the data, evaluates an script to produce session aggregates and then create updated profiles from session aggregates. I am working with high volume data and user sessions may be long, so using generic window apply might not work. Below is the simplification of the stream. stream = createKafkaStreams(...); env.setParallelism(4); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); stream .keyBy(2) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .fold(new SessionData(), new SessionFold(), new ProfilerApply()) .print(); The questions: 1. Initially when I used event time windowing I could not get any of my windows to close. The reason seemed to be that I had 6 partitions in my test kafka setup and only 4 of them generated traffic. If I used parallelism above 4, then no windows were closed. Is this by design or a defect? We use flink-connector-kafka-0.10 because earlier versions did not commit the offsets correctly. 2. Rich fold functions are not supported. However I would like execute a piece of custom script in the fold function that requires initialisation part. I would have used the open and close lifecycle methods of rich functions but they are not available now in fold. What would be the preferred way to run some initialisation routines (and closing the gracefully) when using fold? 3. Kind of related to above. I would also like to fetch a user profile from external source in the beginning of the session. What would be a best practice for that kind of operation? If I would be using the generic window apply I could fetch in in the beginning of the apply method. I was thinking of introducing a mapper that fetches this profiler periodically and caches it to flink state. However, with this setup I would not be able to tie this to user sessions identified for windows. 4. I also may have an additional requirement of writing out each event enriched with current session and profile data. I basically could do this again with generic window function and write out each event with collector when iterating, but would there be a better pattern to use? Maybe sharing state with functions or something. Br, Henri H --001a11440aec5150b80545192218 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,

I have few quest= ions related to Flink streaming. I am on 1.2-SNAPSHOT and what I would like= to accomplish is to have a stream that reads data from multiple kafka topi= cs, identifies user sessions, uses an external user user profile to enrich = the data, evaluates an script to produce session aggregates and then create= updated profiles from session aggregates. I am working with high volume da= ta and user sessions may be long, so using generic window apply might not w= ork. Below is the simplification of the stream.

stream =3D createKafkaS= treams(...);
env.setParallelism(4= );
env.setStreamTimeCharacteristic(TimeCharacteristic.E= ventTime);
stream
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 .keyBy(2)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 .window(EventTimeSessionWindows.withG= ap(Time.minutes(10)))
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 .fold(new SessionData(), new SessionFold(), new ProfilerApply= ())
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .prin= t();

The questions:

1. Initially when I used event time windo= wing I could not get any of my windows to close. The reason seemed to be th= at I had 6 partitions in my test kafka setup and only 4 of them generated t= raffic. If I used parallelism above 4, then no windows were closed. Is this= by design or a defect? We use=C2=A0flink-connector-kafka-0.10 because earl= ier versions did not commit the offsets correctly.

2. Rich fold functio= ns are not supported. However I would like execute a piece of custom script= in the fold function that requires initialisation part. I would have used = the open and close lifecycle methods of rich functions but they are not ava= ilable now in fold. What would be the preferred way to run some initialisat= ion routines (and closing the gracefully) when using fold?

3. Kind of = related to above. I would also like to fetch a user profile from external s= ource in the beginning of the session. What would be a best practice for th= at kind of operation? If I would be using the generic window apply I could = fetch in in the beginning of the apply method. I was thinking of introducin= g a mapper that fetches this profiler periodically and caches it to flink s= tate. However, with this setup I would not be able to tie this to user sess= ions identified for windows.

4. I also may have an additional requireme= nt of writing out each event enriched with current session and profile data= . I basically could do this again with generic window function and write ou= t each event with collector when iterating, but would there be a better pat= tern to use? Maybe sharing state with functions or something.

Br,
=
Henri H
--001a11440aec5150b80545192218--