flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Piotr Nowojski <pi...@data-artisans.com>
Subject Re: Correlation between data streams/operators and threads
Date Tue, 14 Nov 2017 13:27:36 GMT
1. It seems like you have one single data source, not one per device. That might make a difference.
Single data source followed by comap might create one single operator chain. If you want to
go this way, please use my suggested solution c), since you will have troubles with handling
watermarks anyway with single data source.

3. Nico, can you take a look at this one? Isn’t this a blob server issue?

Piotrek

> On 14 Nov 2017, at 11:35, Shailesh Jain <shailesh.jain@stellapps.com> wrote:
> 
> 1. Okay, I understand. My code is similar to what you demonstrated. I have attached a
snap of my job plan visualization.
> 
> 3. Have attached the logs and exception raised (15min - configured akka timeout) after
submitting the job.
> 
> Thanks,
> Shailesh
> 
> 
> On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <piotr@data-artisans.com <mailto:piotr@data-artisans.com>>
wrote:
> Hi,
> 
> 1. 
> I’m not sure what is your code. However I have tested it and here is the example with
multiple streams in one job:
> https://gist.github.com/pnowojski/63fb1c56f2938091769d8de6f513567f <https://gist.github.com/pnowojski/63fb1c56f2938091769d8de6f513567f>
> As expected it created 5 source threads (checked in the debugger) and is printing 5 values
to the output every seconds, so clearly those 5 sources are executed simultaneously.
> 
> Number of operators is not related to the number of threads. Number of operator chains
is. Simple pipelines like source -> map -> filter -> sink will be chained and executed
in one threads, please refer to the documentation link in one of my earlier response.
> 
> Can you share your job code?
> 
> 2. Good point, I forgot to mention that. The job in my example will have 5 operator chains,
but because of task slot sharing, they will share one single task slot. In order to distribute
such job with parallelism 1 across the cluster you have to define different slot sharing groups
per each chain:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#task-chaining-and-resource-groups
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#task-chaining-and-resource-groups>
> Just set it on the sources.
> 
> 3. Can you show the logs from job manager and task manager?
> 
> 4. As long as you have enough heap memory to run your application/tasks there is no upper
limit for number of task slots.
> 
> Piotrek 
> 
>> On 14 Nov 2017, at 07:26, Shailesh Jain <shailesh.jain@stellapps.com <mailto:shailesh.jain@stellapps.com>>
wrote:
>> 
>> Hi Piotrek,
>> 
>> I tried out option 'a' mentioned above, but instead of separate jobs, I'm creating
separate streams per device. Following is the test deployment configuration as a local cluster
(8GB ram, 2.5 GHz i5, ubuntu machine):
>> 
>> akka.client.timeout 15 min
>> jobmanager.heap.mb 1024
>> jobmanager.rpc.address localhost
>> jobmanager.rpc.port 6123
>> jobmanager.web.port 8081
>> metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
>> metrics.reporter.jmx.port 8789
>> metrics.reporters jmx
>> parallelism.default 1
>> taskmanager.heap.mb 1024
>> taskmanager.memory.preallocate false
>> taskmanager.numberOfTaskSlots 4
>> 
>> The number of Operators per device stream is 4 (one sink function, 3 CEP operators).
>> 
>> Observations (and questions):
>> 
>> 1. No. of threads (captured through JMX) is almost the same as the total number of
operators being created. This clears my original question in this thread.
>> 
>> 2. Even when the number of task slots is 4, on web ui, it shows 3 slots as free.
Is this expected? Why are the subtasks not being distributed across slots?
>> 
>> 3. Job deployment hangs (never switches to RUNNING) when the number of devices is
greater than 5. Even on increasing the akka client timeout, it does not help. Will separate
jobs being deployed per device instead of separate streams help here?
>> 
>> 4. Is there an upper limit on number task slots which can be configured? I know that
my operator state size at any given point in time would not be very high, so it looks OK to
deploy independent jobs which can be deployed on the same task manager across slots.
>> 
>> Thanks,
>> Shailesh
>> 
>> 
>> On Mon, Nov 13, 2017 at 7:21 PM, Piotr Nowojski <piotr@data-artisans.com <mailto:piotr@data-artisans.com>>
wrote:
>> Sure, let us know if you have other questions or encounter some issues.
>> 
>> Thanks, Piotrek
>> 
>> 
>>> On 13 Nov 2017, at 14:49, Shailesh Jain <shailesh.jain@stellapps.com <mailto:shailesh.jain@stellapps.com>>
wrote:
>>> 
>>> Thanks, Piotr. I'll try it out and will get back in case of any further questions.
>>> 
>>> Shailesh
>>> 
>>> On Fri, Nov 10, 2017 at 5:52 PM, Piotr Nowojski <piotr@data-artisans.com <mailto:piotr@data-artisans.com>>
wrote:
>>> 1.  It’s a little bit more complicated then that. Each operator chain/task
will be executed in separate thread (parallelism
>>>  Multiplies that). You can check in web ui how was your job split into tasks.
>>> 
>>> 3. Yes that’s true, this is an issue. To preserve the individual watermarks/latencies
(assuming that you have some way to calculate them individually per each device), you could
either:
>>> 
>>> a) have separate jobs per each device with parallelism 1. Pros: independent failures/checkpoints,
Cons: resource usage (number of threads increases with number of devices, there are also other
resources consumed by each job), efficiency, 
>>> b) have one job with multiple data streams. Cons: resource usage (threads)
>>> c) ignore Flink’s watermarks, and implement your own code in place of it. You
could read all of your data in single data stream, keyBy partition/device and manually handle
watermarks logic. You could either try to wrap CEP/Window operators or copy/paste and modify
them to suite your needs. 
>>> 
>>> I would start and try out from a). If it work for your cluster/scale then that’s
fine. If not try b) (would share most of the code with a), and as a last resort try c).
>>> 
>>> Kostas, would you like to add something?
>>> 
>>> Piotrek
>>> 
>>>> On 9 Nov 2017, at 19:16, Shailesh Jain <shailesh.jain@stellapps.com <mailto:shailesh.jain@stellapps.com>>
wrote:
>>>> 
>>>> On 1. - is it tied specifically to the number of source operators or to the
number of Datastream objects created. I mean does the answer change if I read all the data
from a single Kafka topic, get a Datastream of all events, and the apply N filters to create
N individual streams?
>>>> 
>>>> On 3. - the problem with partitions is that watermarks cannot be different
per partition, and since in this use case, each stream is from a device, the latency could
be different (but order will be correct almost always) and there are high chances of loosing
out on events on operators like Patterns which work with windows. Any ideas for workarounds
here?
>>>> 
>>>> 
>>>> Thanks,
>>>> Shailesh
>>>> 
>>>> On 09-Nov-2017 8:48 PM, "Piotr Nowojski" <piotr@data-artisans.com <mailto:piotr@data-artisans.com>>
wrote:
>>>> Hi,
>>>> 
>>>> 1. 
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/parallel.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/parallel.html>
>>>> 
>>>> Number of threads executing would be roughly speaking equal to of the number
of input data streams multiplied by the parallelism.
>>>> 
>>>> 2. 
>>>> Yes, you could dynamically create more data streams at the job startup.
>>>> 
>>>> 3.
>>>> Running 10000 independent data streams on a small cluster (couple of nodes)
will definitely be an issue, since even with parallelism set to 1, there would be quite a
lot of unnecessary threads. 
>>>> 
>>>> It would be much better to treat your data as a single data input stream
with multiple partitions. You could assign partitions between source instances based on parallelism.
For example with parallelism 6:
>>>> - source 0 could get partitions 0, 6, 12, 18
>>>> - source 1, could get partitions 1, 7, …
>>>> …
>>>> - source 5, could get partitions 5, 11, ...
>>>> 
>>>> Piotrek
>>>> 
>>>>> On 9 Nov 2017, at 10:18, Shailesh Jain <shailesh.jain@stellapps.com
<mailto:shailesh.jain@stellapps.com>> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> I'm trying to understand the runtime aspect of Flink when dealing with
multiple data streams and multiple operators per data stream.
>>>>> 
>>>>> Use case: N data streams in a single flink job (each data stream representing
1 device - with different time latencies), and each of these data streams gets split into
two streams, of which one goes into a bunch of CEP operators, and one into a process function.
>>>>> 
>>>>> Questions:
>>>>> 1. At runtime, will the engine create one thread per data stream? Or
one thread per operator?
>>>>> 2. Is it possible to dynamically create a data stream at runtime when
the job starts? (i.e. if N is read from a file when the job starts and corresponding N streams
need to be created)
>>>>> 3. Are there any specific performance impacts when a large number of
streams (N ~ 10000) are created, as opposed to N partitions within a single stream?
>>>>> 
>>>>> Are there any internal (design) documents which can help understanding
the implementation details? Any references to the source will also be really helpful.
>>>>> 
>>>>> Thanks in advance.
>>>>> 
>>>>> Shailesh
>>>>> 
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 
> <Screenshot-2017-11-14 Flink Plan Visualizer.png><flink-shailesh-jobmanager-0-shailesh.log><Exception>


Mime
View raw message