flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shailesh Jain <shailesh.j...@stellapps.com>
Subject Re: Correlation between data streams/operators and threads
Date Tue, 14 Nov 2017 06:26:28 GMT
Hi Piotrek,

I tried out option 'a' mentioned above, but instead of separate jobs, I'm
creating separate streams per device. Following is the test deployment
configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):

akka.client.timeout 15 min
jobmanager.heap.mb 1024
jobmanager.rpc.address localhost
jobmanager.rpc.port 6123
jobmanager.web.port 8081
metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port 8789
metrics.reporters jmx
parallelism.default 1
taskmanager.heap.mb 1024
taskmanager.memory.preallocate false
taskmanager.numberOfTaskSlots 4

The number of Operators per device stream is 4 (one sink function, 3 CEP

Observations (and questions):

1. No. of threads (captured through JMX) is almost the same as the total
number of operators being created. This clears my original question in this

2. Even when the number of task slots is 4, on web ui, it shows 3 slots as
free. Is this expected? Why are the subtasks not being distributed across

3. Job deployment hangs (never switches to RUNNING) when the number of
devices is greater than 5. Even on increasing the akka client timeout, it
does not help. Will separate jobs being deployed per device instead of
separate streams help here?

4. Is there an upper limit on number task slots which can be configured? I
know that my operator state size at any given point in time would not be
very high, so it looks OK to deploy independent jobs which can be deployed
on the same task manager across slots.


On Mon, Nov 13, 2017 at 7:21 PM, Piotr Nowojski <piotr@data-artisans.com>

> Sure, let us know if you have other questions or encounter some issues.
> Thanks, Piotrek
> On 13 Nov 2017, at 14:49, Shailesh Jain <shailesh.jain@stellapps.com>
> wrote:
> Thanks, Piotr. I'll try it out and will get back in case of any further
> questions.
> Shailesh
> On Fri, Nov 10, 2017 at 5:52 PM, Piotr Nowojski <piotr@data-artisans.com>
> wrote:
>> 1.  It’s a little bit more complicated then that. Each operator
>> chain/task will be executed in separate thread (parallelism
>>  Multiplies that). You can check in web ui how was your job split into
>> tasks.
>> 3. Yes that’s true, this is an issue. To preserve the individual
>> watermarks/latencies (assuming that you have some way to calculate them
>> individually per each device), you could either:
>> a) have separate jobs per each device with parallelism 1. Pros:
>> independent failures/checkpoints, Cons: resource usage (number of threads
>> increases with number of devices, there are also other resources consumed
>> by each job), efficiency,
>> b) have one job with multiple data streams. Cons: resource usage (threads)
>> c) ignore Flink’s watermarks, and implement your own code in place of it.
>> You could read all of your data in single data stream, keyBy
>> partition/device and manually handle watermarks logic. You could either try
>> to wrap CEP/Window operators or copy/paste and modify them to suite your
>> needs.
>> I would start and try out from a). If it work for your cluster/scale then
>> that’s fine. If not try b) (would share most of the code with a), and as a
>> last resort try c).
>> Kostas, would you like to add something?
>> Piotrek
>> On 9 Nov 2017, at 19:16, Shailesh Jain <shailesh.jain@stellapps.com>
>> wrote:
>> On 1. - is it tied specifically to the number of source operators or to
>> the number of Datastream objects created. I mean does the answer change if
>> I read all the data from a single Kafka topic, get a Datastream of all
>> events, and the apply N filters to create N individual streams?
>> On 3. - the problem with partitions is that watermarks cannot be
>> different per partition, and since in this use case, each stream is from a
>> device, the latency could be different (but order will be correct almost
>> always) and there are high chances of loosing out on events on operators
>> like Patterns which work with windows. Any ideas for workarounds here?
>> Thanks,
>> Shailesh
>> On 09-Nov-2017 8:48 PM, "Piotr Nowojski" <piotr@data-artisans.com> wrote:
>> Hi,
>> 1.
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/parallel.html
>> Number of threads executing would be roughly speaking equal to of the
>> number of input data streams multiplied by the parallelism.
>> 2.
>> Yes, you could dynamically create more data streams at the job startup.
>> 3.
>> Running 10000 independent data streams on a small cluster (couple of
>> nodes) will definitely be an issue, since even with parallelism set to 1,
>> there would be quite a lot of unnecessary threads.
>> It would be much better to treat your data as a single data input stream
>> with multiple partitions. You could assign partitions between source
>> instances based on parallelism. For example with parallelism 6:
>> - source 0 could get partitions 0, 6, 12, 18
>> - source 1, could get partitions 1, 7, …
>> …
>> - source 5, could get partitions 5, 11, ...
>> Piotrek
>> On 9 Nov 2017, at 10:18, Shailesh Jain <shailesh.jain@stellapps.com>
>> wrote:
>> Hi,
>> I'm trying to understand the runtime aspect of Flink when dealing with
>> multiple data streams and multiple operators per data stream.
>> Use case: N data streams in a single flink job (each data stream
>> representing 1 device - with different time latencies), and each of these
>> data streams gets split into two streams, of which one goes into a bunch of
>> CEP operators, and one into a process function.
>> Questions:
>> 1. At runtime, will the engine create one thread per data stream? Or one
>> thread per operator?
>> 2. Is it possible to dynamically create a data stream at runtime when the
>> job starts? (i.e. if N is read from a file when the job starts and
>> corresponding N streams need to be created)
>> 3. Are there any specific performance impacts when a large number of
>> streams (N ~ 10000) are created, as opposed to N partitions within a single
>> stream?
>> Are there any internal (design) documents which can help understanding
>> the implementation details? Any references to the source will also be
>> really helpful.
>> Thanks in advance.
>> Shailesh

View raw message