Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 69B06197F6 for ; Thu, 28 Apr 2016 09:34:03 +0000 (UTC) Received: (qmail 50193 invoked by uid 500); 28 Apr 2016 09:34:03 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 50097 invoked by uid 500); 28 Apr 2016 09:34:03 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 50086 invoked by uid 99); 28 Apr 2016 09:34:03 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Apr 2016 09:34:03 +0000 Received: from mail-lf0-f47.google.com (mail-lf0-f47.google.com [209.85.215.47]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 7DFA81A0098 for ; Thu, 28 Apr 2016 09:34:02 +0000 (UTC) Received: by mail-lf0-f47.google.com with SMTP id j11so88042348lfb.1 for ; Thu, 28 Apr 2016 02:34:02 -0700 (PDT) X-Gm-Message-State: AOPr4FWUmPsf43UiE/lNZnXemt3hm6/u4AWXZbK4msvOX1o8iqFmQ+FrsqeBwwpBOFfM/lSrik4W4/k13AX8WA== X-Received: by 10.25.145.136 with SMTP id t130mr5606054lfd.4.1461836040939; Thu, 28 Apr 2016 02:34:00 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Aljoscha Krettek Date: Thu, 28 Apr 2016 09:33:50 +0000 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Multiple windows with large number of partitions To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a114026d6f0b872053188392c --001a114026d6f0b872053188392c Content-Type: text/plain; charset=UTF-8 Hi, is there are reason for keying on both the "date only" field and the "userid". I think you should be fine by just specifying that you want 1-day windows on your timestamps. Also, do you have a timestamp extractor in place that takes the timestamp from your data and sets it as the internal timestamp field. This is explained in more detail here: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators Cheers, Aljoscha On Thu, 28 Apr 2016 at 06:04 Christopher Santiago wrote: > I've been working through the flink demo applications and started in on a > prototype, but have run into an issue with how to approach the problem of > getting a daily unique user count from a traffic stream. I'm using a time > characteristic event time. > > Sample event stream > (timestamp,userid): > 2015-12-02T01:13:21.002Z,bc030136a91aa46eb436dcb28fa72fed > 2015-12-02T01:13:21.003Z,bc030136a91aa46eb436dcb28fa72fed > 2015-12-02T01:37:48.003Z,bc030136a91aa46eb436dcb28fa72fed > 2015-12-02T01:37:48.004Z,bc030136a91aa46eb436dcb28fa72fed > 2015-12-02T02:02:15.004Z,bc030136a91aa46eb436dcb28fa72fed > 2015-12-02T00:00:00.000Z,5dd63d9756a975d0f4be6a6856005381 > 2015-12-02T00:16:58.000Z,5dd63d9756a975d0f4be6a6856005381 > 2015-12-02T00:00:00.000Z,ccd72e4535c92c499bb66eea6f4f9aab > 2015-12-02T00:14:56.000Z,ccd72e4535c92c499bb66eea6f4f9aab > 2015-12-02T00:14:56.001Z,ccd72e4535c92c499bb66eea6f4f9aab > 2015-12-02T00:29:52.001Z,ccd72e4535c92c499bb66eea6f4f9aab > 2015-12-02T00:29:52.002Z,ccd72e4535c92c499bb66eea6f4f9aab > 2015-12-02T00:44:48.002Z,ccd72e4535c92c499bb66eea6f4f9aab > > Requirements: > 1. Get a count of the unique users by day. > 2. Early results should be emitted as quickly as possible. (I've been > trying to use 30/60 seconds windows) > 3. Events are accepted up to 2 days late. > > I've used the following as guidance: > > EventTimeTriggerWithEarlyAndLateFiring > > https://raw.githubusercontent.com/kl0u/flink-examples/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java > > Multi-window transformations > > http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CBAY182-W870B521BDC5709973990D5ED9A0%40phx.gbl%3E > > Summing over aggregate windows > > http://stackoverflow.com/questions/36791816/how-to-declare-1-minute-tumbling-window > > but I can't get the reduce/aggregation logic working correctly. Here's a > sample of how I have the windows setup with a datastream of tuple3 with > timestamp, date only, userid: > > DataStream> uniqueLogins = logins > .keyBy(1,2) > .timeWindow(Time.days(1)) > .trigger(EventTimeNoLog.create() //Same as > EventTimeTriggerWithEarlyAndLateFiring, just modified logging since it is > potentially 2-3x per event read in > .withEarlyFiringEvery(Time.seconds(60)) > .withLateFiringEvery(Time.seconds(600)) > .withAllowedLateness(Time.days(2))) > //Reduce to earliest timestamp for a given day for a user > .reduce(new ReduceFunction>() { > public Tuple3 reduce(Tuple3 String> event1, Tuple3 event2) { > return event1; > } > }); > > SingleOutputStreamOperator> window = uniqueLogins > .timeWindowAll(Time.days(1)) > .trigger(EventTimeTriggerWithEarlyAndLateFiring.create() > .withEarlyFiringEvery(Time.seconds(60)) > .withLateFiringEvery(Time.seconds(600)) > .withAllowedLateness(Time.days(2)) > .aggregator()) //Modified EventTimeTriggerWithEarlyAndLateFiring that > does a fire_and_purge on onProcessingTime when aggregator is set > //Manually count > .apply(new AllWindowFunction, > Tuple2, TimeWindow>() { > @Override > public void apply(TimeWindow window, > Iterable> input, Collector Long>> out) throws Exception { > int count = 0; > String windowTime = null; > > for (Tuple3 login: input) { > windowTime = login.f1; > count++; > } > out.collect (new Tuple2(windowTime, new Long(count))); > } > }); > > From the logging I've put in place, it seems that there is a performance > issue with the first keyBy where there now a unique window for each > date/user combination (in my sample data around 500k windows) which when > reducing is not emitting results at a constant enough rate for the second > window to perform its aggregation at a scheduleable interval. Is there a > better approach to performing this type of calculation directly in flink? > --001a114026d6f0b872053188392c Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,
is there are reason for keying on both the "d= ate only" field and the "userid". I think you should be fine= by just specifying that you want 1-day windows on your timestamps.

Also, do you have a timestamp extractor in place that tak= es the timestamp from your data and sets it as the internal timestamp field= . This is explained in more detail here:=C2=A0https://ci.apache.org/= projects/flink/flink-docs-master/apis/streaming/event_timestamps_watermarks= .html#timestamp-assigners--watermark-generators

Cheers,
Aljoscha

On Thu, 28 Apr 2016 at 06:04 Christopher Santiago <chris@ninjametrics.com> wrote:
I've been w= orking through the flink demo applications and started in on a prototype, b= ut have run into an issue with how to approach the problem of getting a dai= ly unique user count from a traffic stream.=C2=A0 I'm using a time char= acteristic event time.

Sample event stream=C2=A0
(timestamp,userid):
2015-12-02T01:13:21.002Z,bc030136a91= aa46eb436dcb28fa72fed
2015-12-02T01:13:21.003Z,bc030136a91aa46eb4= 36dcb28fa72fed
2015-12-02T01:37:48.003Z,bc030136a91aa46eb436dcb28= fa72fed
2015-12-02T01:37:48.004Z,bc030136a91aa46eb436dcb28fa72fed=
2015-12-02T02:02:15.004Z,bc030136a91aa46eb436dcb28fa72fed
<= div>2015-12-02T00:00:00.000Z,5dd63d9756a975d0f4be6a6856005381
201= 5-12-02T00:16:58.000Z,5dd63d9756a975d0f4be6a6856005381
2015-12-02= T00:00:00.000Z,ccd72e4535c92c499bb66eea6f4f9aab
2015-12-02T00:14:= 56.000Z,ccd72e4535c92c499bb66eea6f4f9aab
2015-12-02T00:14:56.001Z= ,ccd72e4535c92c499bb66eea6f4f9aab
2015-12-02T00:29:52.001Z,ccd72e= 4535c92c499bb66eea6f4f9aab
2015-12-02T00:29:52.002Z,ccd72e4535c92= c499bb66eea6f4f9aab
2015-12-02T00:44:48.002Z,ccd72e4535c92c499bb6= 6eea6f4f9aab

Requirements:
1.=C2=A0 Get = a count of the unique users by day.
2.=C2=A0 Early results should= be emitted as quickly as possible. =C2=A0(I've been trying to use 30/6= 0 seconds windows)
3.=C2=A0 Events are accepted up to 2 days late= .

I've used the following as guidance:

EventTimeTriggerWithEarlyAndLateFiring=C2=A0
https://raw.githubus= ercontent.com/kl0u/flink-examples/master/src/main/java/com/dataartisans/fli= nksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFirin= g.java

Multi-window transformations=C2=A0

Sum= ming over aggregate windows

but I can't get the reduce/aggr= egation logic working correctly.=C2=A0 Here's a sample of how I have th= e windows setup with a datastream of tuple3 with timestamp, date only, user= id:

DataStream<Tuple3<DateTime, String, Stri= ng>> uniqueLogins =3D logins
.keyBy(1,2)
= .timeWindow(Time.days(1))
.trigger(EventTimeNoLog.create() =C2=A0//Same as EventTimeTrig= gerWithEarlyAndLateFiring, just modified logging since it is potentially 2-= 3x per event read in
.withEarlyFiringEvery(Time.seconds(60))
.withLateFiringEvery(Time.seconds(600))
= .withAllowedLateness(Time.da= ys(2))) =C2=A0 =C2=A0
//Reduce to earliest timest= amp for a given day for a user
.reduce(new ReduceFunction<Tuple3<DateTime, String, String= >>() {
public= Tuple3<DateTime, String, String> reduce(Tuple3<DateTime, String, = String> event1, Tuple3<DateTime, String, String> event2) {
return event1;
= }
});

SingleOutputStream= Operator<Tuple2<String, Long>> window =3D uniqueLogins
.timeWindowAll(Time.days(1))
.trigger(EventTimeTriggerWithEarly= AndLateFiring.create()
.withEarlyFiringEvery(Time.seconds(60))
.withLateFiringEvery(Time.seconds(600))
= .withAllowedLateness(Time.day= s(2))
.aggregator()= ) =C2=A0//Modified EventTimeTriggerWithEarlyAndLateFiring that does a fire_= and_purge on onProcessingTime when aggregator is set=C2=A0
//Manu= ally count
.apply(new AllWindowFunction<Tuple3<DateTime,Str= ing,String>, Tuple2<String, Long>, TimeWindow>() {
@Override
public void apply(TimeWindow window,
Iterable<Tuple3<D= ateTime,String,String>> input, Collector<Tuple2<String, Long>= ;> out) throws Exception {
int count =3D 0;
= String windowTime =3D null;
=C2=A0
<= /span> =C2=A0for (Tuple3<DateTime,String,String> login: input) {
=C2=A0windowTime =3D = login.f1;
=C2=A0c= ount++;
=C2=A0}
=C2=A0out.collect (n= ew Tuple2<String, Long>(windowTime, new Long(count)));
}
});

From the logging I've put in place, it seems that there is a per= formance issue with the first keyBy where there now a unique window for eac= h date/user combination (in my sample data around 500k windows) which when = reducing is not emitting results at a constant enough rate for the second w= indow to perform its aggregation at a scheduleable interval.=C2=A0 Is there= a better approach to performing this type of calculation directly in flink= ?
--001a114026d6f0b872053188392c--