Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C2A8D200C3D for ; Tue, 14 Mar 2017 10:00:29 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C13BE160B7E; Tue, 14 Mar 2017 09:00:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BD02D160B7C for ; Tue, 14 Mar 2017 10:00:28 +0100 (CET) Received: (qmail 24781 invoked by uid 500); 14 Mar 2017 09:00:27 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 24762 invoked by uid 99); 14 Mar 2017 09:00:25 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Mar 2017 09:00:25 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 0892EC0118 for ; Tue, 14 Mar 2017 09:00:25 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.38 X-Spam-Level: ** X-Spam-Status: No, score=2.38 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id jEWFeu0YqKXi for ; Tue, 14 Mar 2017 09:00:23 +0000 (UTC) Received: from mail-qk0-f181.google.com (mail-qk0-f181.google.com [209.85.220.181]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 4C8385FB4E for ; Tue, 14 Mar 2017 09:00:22 +0000 (UTC) Received: by mail-qk0-f181.google.com with SMTP id 1so241802346qkl.3 for ; Tue, 14 Mar 2017 02:00:22 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=aB/ArUI8MTFvkdnFjvz8SpKgXCxZePUOLUbkKMGD1x8=; b=VacLL2pbkO+Pq66G5d+QXLgNYU1KX4j5a4xKYyKEDXCWcov2n7svyU/ZwKpCqiq2Nz rDyW+hYIyUiNEFdOGOQpa2T4yfmN12GG/dHqjSug3QZCf0/aP+bHDtZEofi2QwEbR2lc YdGXB8jYEinSLz8p6ZdYNOmNfAWPedb5MsqZ7tUJJmSjpzLtlc49T0HdlDVMFr2g9Qk1 kOAWLQz4kd54K8E89Y0zQTfTXc/GZLGS++6PnpPSs5ezxWbHBHhpItJfm+PUFr2OFjgy frPSEMGYoBuxqbj1BdWeF7go1BG7/sDb5xjH+uMB0gTM4hGu7RuPi/PqUiGkhUlm3btF NOSg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=aB/ArUI8MTFvkdnFjvz8SpKgXCxZePUOLUbkKMGD1x8=; b=CEj/bBwN9tB7X5tgZSugzk1127/JnBVxN57qO/FcZ3bhufSHu74w1GV4xWqVy48KIX 2vOsgdZoJHTeJ1KadJ6JZNpcDOBg1yaxmgLlvXh2nXXN12u7isIrfazV8aKSkS91Envy YKvKyGURlf+GrIoEVt/Q+8XMu2UlgSHc/TkaTLYgwi1K3qCY/NtUG5LFNCViiv4gsdr1 lwS/3ru8WjouHHk31f5N05v8m4c7p50CZOoFhSFC/HPffT60dV9ew15Ng0JSHO4NSBnn WZe9szJdjnwQmUpamnloRdEX4r8NtxRMSBoWxW1+m7DlmUVK0WFzt5GfETn5+LYQLiDx 4A+g== X-Gm-Message-State: AMke39lfWaxYvqwXhn2Yy/tHIUmEvNwzNqEhVrudSRJpDmpNq67y5QXSehhEbPEIfH4xUCxZgaPRqWLS9SzMpQ== X-Received: by 10.55.48.147 with SMTP id w141mr33421783qkw.306.1489482020470; Tue, 14 Mar 2017 02:00:20 -0700 (PDT) MIME-Version: 1.0 Received: by 10.200.34.72 with HTTP; Tue, 14 Mar 2017 01:59:50 -0700 (PDT) In-Reply-To: <668A4F6A-67DB-43F4-B945-02B5ACB0BD42@aconex.com> References: <668A4F6A-67DB-43F4-B945-02B5ACB0BD42@aconex.com> From: Fabian Hueske Date: Tue, 14 Mar 2017 09:59:50 +0100 Message-ID: Subject: Re: Batch stream Sink delay ? To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11490b78bacd89054aad0e3b archived-at: Tue, 14 Mar 2017 09:00:29 -0000 --001a11490b78bacd89054aad0e3b Content-Type: text/plain; charset=UTF-8 Hi Paul, This might be an issue with the watermarks. A window operation can only be compute and emit its results when the watermark time is later than the end time of the window. Each operator keeps track of the maximum timestamp of all its input tasks and computes its own time as the minimum of all those maximum timestamps. If one (or all) watermarks received by the window operator is not later than the end time of the window, the window will not be computed. When an file input is completely processed, Flink sends a Long.MAX_VALUE timestamp which might trigger the execution at the end of the job. I would try to debug the watermarks of your job. The web dashboard provides a few metrics for that. Best, Fabian 2017-03-14 2:47 GMT+01:00 Paul Smith : > Using Flink 1.2. > > > > Hi all, I have a question about Batch processing and Sinks. I have a > Flink job that parses a User Request log file that contains performance > data per request. It accumulates metric data values into 15 minute time > windows. Each log line is mapped to multiple records, so that each of the > metric value can be 'billed' against a few different categories (User, > Organisation, Project, Action). The goal of the Flink job is to distil the > volume of request data into more manageable summaries. We can then see > breakdowns of, say, User CPU utilised by distinct categories (e.g. Users) > and allow us to look for trends/outliers. > > > > It is working very well as a batch, but with one unexpected behaviour. > > > > What I can't understand is that the Sink does not appear to get _any_ > records until the rest of the chain has completed over the entire file. > I've played around with parallelism (1->3), and also verified with logging > that the Sink isn't seeing any data until the entire previous chain is > complete. Is this expected behaviour? I was thinking that as each Time > Window passed a 'block' of the results would be emitted to the sink. Since > we use Event Time characteristics, the batch job ought to emit these chunks > as each 15 minute segment passes? > > > > You can see the base job here, with an example of a single log line with > the metrics here: > > > > https://gist.github.com/tallpsmith/a2e5212547fb3c7220b0e49846d2f152 > > > > Each Category I've called an 'identifierType' (User, Organisation..), with > the 'identifier' the value of that (a UserId for example). I key the > stream by this pair of records and then Fold the records by the Time window > summing each metric type's value up. I have yet to work out the proper use > case for Fold versus Reduce, I may have got that wrong, but I can't see how > that changes the flow here. > > > > The output is a beautiful rolled up summary by 15 minutes by each > identifierType & identifier. I have yet to attempt a live Streaming > version of this, but had thought the Batch version would also be concurrent > and start emitting 15 minute windows as soon as the stream chunks > transitions into the next window. Given the entire job takes about 5 > minutes on my laptop for 8 million raw source lines whose data is spread > out over an entire day, I would have thought there's some part of that 5 > minutes that would be emitting chunks of summary data to the sink? But > nothing turns up until the entire job is done. > > > > Maybe the data is just too small.. Maybe there's buffering going on > somewhere in the chain. ? > > > > Any pointers would be appreciated in understanding the flow here. > > > > Cheers, > > > > Paul Smith > > > > > --001a11490b78bacd89054aad0e3b Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Paul,

This might be an issue wit= h the watermarks. A window operation can only be compute and emit its resul= ts when the watermark time is later than the end time of the window.
Each operator keeps track of the maximum timestamp of all its input= tasks and computes its own time as the minimum of all those maximum timest= amps.
If one (or all) watermarks received by the window opera= tor is not later than the end time of the window, the window will not be co= mputed.
When an file input is completely processed, Flink sen= ds a Long.MAX_VALUE timestamp which might trigger the execution at the end = of the job.

I would try to debug the watermarks of your j= ob. The web dashboard provides a few metrics for that.

Be= st, Fabian

2017-03-14 2:47 GMT+01:00 Paul Smith <psmith@aconex.com>:

Using Flink 1.2.=

=C2=A0

Hi all, I have a qu= estion about Batch processing and Sinks.=C2=A0 I have a Flink job that pars= es a User Request log file that contains performance data per request. It a= ccumulates metric data values into 15 minute time windows.=C2=A0 Each log line is mapped to multiple records, so that e= ach of the metric value can be 'billed' against a few different cat= egories (User, Organisation, Project, Action).=C2=A0 The goal of the Flink = job is to distil the volume of request data into more manageable summaries.=C2=A0 We can then see breakdowns of, say, User CPU u= tilised by distinct categories (e.g. Users) and allow us to look for trends= /outliers.

=C2=A0

It is working very = well as a batch, but with one unexpected behaviour.

=C2=A0

What I can't un= derstand is that the Sink does not appear to get _any_ records until the re= st of the chain has completed over the entire file.=C2=A0 I've played a= round with parallelism (1->3), and also verified with logging that the Sink isn't seeing any data until the entire prev= ious chain is complete.=C2=A0 Is this expected behaviour? I was thinking th= at as each Time Window passed a 'block' of the results would be emi= tted to the sink.=C2=A0 Since we use Event Time characteristics, the batch job ought to emit these chunks as each 15 minute segment passes?=

=C2=A0

You can see the bas= e job here, with an example of a single log line with the metrics here:<= /u>

=C2=A0

https://gist.github.com/tallpsmith/a2e5212547fb3c7220b0e49846= d2f152

=C2=A0

Each Category I'= ;ve called an 'identifierType' (User, Organisation..), with the = 9;identifier' the value of that (a UserId for example).=C2=A0 I key the= stream by this pair of records and then Fold the records by the Time window summing each metric type's value up.=C2=A0 I have yet = to work out the proper use case for Fold versus Reduce, I may have got that= wrong, but I can't see how that changes the flow here.

=C2=A0

The output is a bea= utiful rolled up summary by 15 minutes by each identifierType & identif= ier.=C2=A0 I have yet to attempt a live Streaming version of this, but had = thought the Batch version would also be concurrent and start emitting 15 minute windows as soon as the stream chunks transiti= ons into the next window.=C2=A0=C2=A0 Given the entire job takes about 5 mi= nutes on my laptop for 8 million raw source lines whose data is spread out = over an entire day, I would have thought there's some part of that 5 minutes that would be emitting chunks of summary data = to the sink?=C2=A0 But nothing turns up until the entire job is done.

=C2=A0

Maybe the data is j= ust too small.. Maybe there's buffering going on somewhere in the chain= . ?

=C2=A0

Any pointers would = be appreciated in understanding the flow here.

=C2=A0

Cheers,

=

=C2=A0

Paul Smith

=C2=A0

=C2=A0


--001a11490b78bacd89054aad0e3b--