Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9A710200D37 for ; Thu, 9 Nov 2017 19:16:33 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 98E3B160BEF; Thu, 9 Nov 2017 18:16:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B8ADF1609C8 for ; Thu, 9 Nov 2017 19:16:32 +0100 (CET) Received: (qmail 32928 invoked by uid 500); 9 Nov 2017 18:16:31 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 32918 invoked by uid 99); 9 Nov 2017 18:16:31 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Nov 2017 18:16:31 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id C4E371A1507 for ; Thu, 9 Nov 2017 18:16:30 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.981 X-Spam-Level: * X-Spam-Status: No, score=1.981 tagged_above=-999 required=6.31 tests=[AC_DIV_BONANZA=0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=stellapps-com.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id TvMRcXW7GjGG for ; Thu, 9 Nov 2017 18:16:28 +0000 (UTC) Received: from mail-lf0-f48.google.com (mail-lf0-f48.google.com [209.85.215.48]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 7E4C860F70 for ; Thu, 9 Nov 2017 18:16:28 +0000 (UTC) Received: by mail-lf0-f48.google.com with SMTP id o66so28300lfg.0 for ; Thu, 09 Nov 2017 10:16:28 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=stellapps-com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=tVZYvSFgAVxYdIX7KanenFz/KkjJQvHXPVGzYhU9A70=; b=fT8SWlW+I/1XgHP9UWRJMaFleeFaJ2ElBigLsaTSiZwG3MnmynnrEb0fqyRvmVxCFe ZJYgHxXw8/HcX3ARuZ6y6Ruyv8HIWbOA8/srgNdAapkEKVfsUJeUCQ+LdwUlWFnssWGL hDoqKAXDrhbxYpeTR/V0m8D4ozUeT0BHVHzQBOkHjkawyosK/PJtQy+MNuGlB4y1IH7V MHaCuoU/2Z7ut1qY3VZPE4WcpV/8tc/nDA4/uG94EJQDygB7ziSWSZ+jZJO5+vaveJlv 8IU4A3cIOWEoKdfU1jCghMepE/T99pT6uzlMwU5Py8fs/XqMZKgLQbVpl06U+iX/9u+7 ET4A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=tVZYvSFgAVxYdIX7KanenFz/KkjJQvHXPVGzYhU9A70=; b=izUveg59wXSNVM4BqIotH13bUvNY7PChsDP/Mx2vTDbEMUdddq8XqpXUms1vI9f6ul 1nwP6gkoVn30W8FNz8MJ3r2PMwCuTRxcG/W1fAWHyqH7KUZ2sX02qsi3fe+M0FFwTgS3 Ycm82w+gNPI21CdusabllLjuKo8CRPr2qCDeNkFnmAVT6sn5VRAgSMHv3j/+43E2z+Mf qVTqs/bdVDryIi8eK/fswLmOivnGCncZQHDK2tQ1ZmTa+fc6jxs9z1q4ZRls4L4P9cUD hTumFkLl7opqZ9y63ajRN4CVIf/R9rmfknMW+29ellSlBb0B45RSnW+GgF26WDNd1i5w u+cg== X-Gm-Message-State: AJaThX7ys/cnez7Q6EFOk8Z0sIQh37/O3YqyiuNLis67RGwMlIQVCC00 XNH4cCxMdh5720+jFQPp7RLWLRPJ3nL56PmrXER7lTEk X-Google-Smtp-Source: ABhQp+SDfL9owI3ZJ3ipQ3GYX3JBVG3WIUT1GcFYz9egxF85xNBUgCzTGUS5qadfbAahl6V1AEfLDQWmomjWj7AncAc= X-Received: by 10.46.89.199 with SMTP id g68mr596234ljf.12.1510251387706; Thu, 09 Nov 2017 10:16:27 -0800 (PST) MIME-Version: 1.0 Received: by 10.25.145.85 with HTTP; Thu, 9 Nov 2017 10:16:26 -0800 (PST) Received: by 10.25.145.85 with HTTP; Thu, 9 Nov 2017 10:16:26 -0800 (PST) In-Reply-To: References: From: Shailesh Jain Date: Thu, 9 Nov 2017 23:46:26 +0530 Message-ID: Subject: Re: Correlation between data streams/operators and threads To: Piotr Nowojski Cc: user@flink.apache.org Content-Type: multipart/alternative; boundary="94eb2c082cbe7c72a0055d90cd29" archived-at: Thu, 09 Nov 2017 18:16:33 -0000 --94eb2c082cbe7c72a0055d90cd29 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable On 1. - is it tied specifically to the number of source operators or to the number of Datastream objects created. I mean does the answer change if I read all the data from a single Kafka topic, get a Datastream of all events, and the apply N filters to create N individual streams? On 3. - the problem with partitions is that watermarks cannot be different per partition, and since in this use case, each stream is from a device, the latency could be different (but order will be correct almost always) and there are high chances of loosing out on events on operators like Patterns which work with windows. Any ideas for workarounds here? Thanks, Shailesh On 09-Nov-2017 8:48 PM, "Piotr Nowojski" wrote: Hi, 1. https://ci.apache.org/projects/flink/flink-docs- release-1.3/dev/parallel.html Number of threads executing would be roughly speaking equal to of the number of input data streams multiplied by the parallelism. 2. Yes, you could dynamically create more data streams at the job startup. 3. Running 10000 independent data streams on a small cluster (couple of nodes) will definitely be an issue, since even with parallelism set to 1, there would be quite a lot of unnecessary threads. It would be much better to treat your data as a single data input stream with multiple partitions. You could assign partitions between source instances based on parallelism. For example with parallelism 6: - source 0 could get partitions 0, 6, 12, 18 - source 1, could get partitions 1, 7, =E2=80=A6 =E2=80=A6 - source 5, could get partitions 5, 11, ... Piotrek On 9 Nov 2017, at 10:18, Shailesh Jain wrote: Hi, I'm trying to understand the runtime aspect of Flink when dealing with multiple data streams and multiple operators per data stream. Use case: N data streams in a single flink job (each data stream representing 1 device - with different time latencies), and each of these data streams gets split into two streams, of which one goes into a bunch of CEP operators, and one into a process function. Questions: 1. At runtime, will the engine create one thread per data stream? Or one thread per operator? 2. Is it possible to dynamically create a data stream at runtime when the job starts? (i.e. if N is read from a file when the job starts and corresponding N streams need to be created) 3. Are there any specific performance impacts when a large number of streams (N ~ 10000) are created, as opposed to N partitions within a single stream? Are there any internal (design) documents which can help understanding the implementation details? Any references to the source will also be really helpful. Thanks in advance. Shailesh --94eb2c082cbe7c72a0055d90cd29 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
On 1. - is it tied specifically to the number of sou= rce operators or to the number of Datastream objects created. I mean does t= he answer change if I read all the data from a single Kafka topic, get a Da= tastream of all events, and the apply N filters to create N individual stre= ams?

On 3. - the problem= with partitions is that watermarks cannot be different per partition, and = since in this use case, each stream is from a device, the latency could be = different (but order will be correct almost always) and there are high chan= ces of loosing out on events on operators like Patterns which work with win= dows. Any ideas for workarounds here?


Thanks,
Shailesh

On 09-Nov-2017 8:48 P= M, "Piotr Nowojski" <piotr@data-artisans.com> wrote:
Number of threads executing would be roughly speaking equ= al to of the number of input data streams multiplied by the parallelism.

2.=C2=A0
Yes, you could dynamically create= more data streams at the job startup.

3.
Running 10000 independent data streams on a small cluster (couple of node= s) will definitely be an issue, since even with parallelism set to 1, there= would be quite a lot of unnecessary threads.=C2=A0

It would be much better to treat your data as a single data input stream = with multiple partitions. You could assign partitions between source instan= ces based on parallelism. For example with parallelism 6:
- sourc= e 0 could get partitions 0, 6, 12, 18
- source 1, could get parti= tions 1, 7, =E2=80=A6
=E2=80=A6
- source 5, could get p= artitions 5, 11, ...

Piotrek

On 9 Nov 2017, at 1= 0:18, Shailesh Jain <shailesh.jain@stellapps.com> wrote:

=
Hi,

I'm tryin= g to understand the runtime aspect of Flink when dealing with multiple data= streams and multiple operators per data stream.

Us= e case: N data streams in a single flink job (each data stream representing= 1 device - with different time latencies), and each of these data streams = gets split into two streams, of which one goes into a bunch of CEP operator= s, and one into a process function.

Questions:
1. At = runtime, will the engine create one thread per data stream? Or one thread p= er operator?
2. Is it possible to dynamically create a data = stream at runtime when the job starts? (i.e. if N is read from a file when = the job starts and corresponding N streams need to be created)
3. = Are there any specific performance impacts when a large number of streams (= N ~ 10000) are created, as opposed to N partitions within a single stream?<= br>
Are there any internal (design) documents which can help under= standing the implementation details? Any references to the source will also= be really helpful.

Thanks in advance.

Shailesh




--94eb2c082cbe7c72a0055d90cd29--