flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shailesh Jain <shailesh.j...@stellapps.com>
Subject Re: Correlation between data streams/operators and threads
Date Tue, 14 Nov 2017 10:35:20 GMT
1. Okay, I understand. My code is similar to what you demonstrated. I have
attached a snap of my job plan visualization.

3. Have attached the logs and exception raised (15min - configured akka
timeout) after submitting the job.

Thanks,
Shailesh


On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <piotr@data-artisans.com>
wrote:

> Hi,
>
> 1.
> I’m not sure what is your code. However I have tested it and here is the
> example with multiple streams in one job:
> https://gist.github.com/pnowojski/63fb1c56f2938091769d8de6f513567f
> As expected it created 5 source threads (checked in the debugger) and is
> printing 5 values to the output every seconds, so clearly those 5 sources
> are executed simultaneously.
>
> Number of operators is not related to the number of threads. Number of
> operator chains is. Simple pipelines like source -> map -> filter -> sink
> will be chained and executed in one threads, please refer to the
> documentation link in one of my earlier response.
>
> Can you share your job code?
>
> 2. Good point, I forgot to mention that. The job in my example will have 5
> operator chains, but because of task slot sharing, they will share one
> single task slot. In order to distribute such job with parallelism 1 across
> the cluster you have to define different slot sharing groups per each chain:
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/datastream_api.html#task-chaining-and-resource-groups
> Just set it on the sources.
>
> 3. Can you show the logs from job manager and task manager?
>
> 4. As long as you have enough heap memory to run your application/tasks
> there is no upper limit for number of task slots.
>
> Piotrek
>
> On 14 Nov 2017, at 07:26, Shailesh Jain <shailesh.jain@stellapps.com>
> wrote:
>
> Hi Piotrek,
>
> I tried out option 'a' mentioned above, but instead of separate jobs, I'm
> creating separate streams per device. Following is the test deployment
> configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):
>
> akka.client.timeout 15 min
> jobmanager.heap.mb 1024
> jobmanager.rpc.address localhost
> jobmanager.rpc.port 6123
> jobmanager.web.port 8081
> metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporter.jmx.port 8789
> metrics.reporters jmx
> parallelism.default 1
> taskmanager.heap.mb 1024
> taskmanager.memory.preallocate false
> taskmanager.numberOfTaskSlots 4
>
> The number of Operators per device stream is 4 (one sink function, 3 CEP
> operators).
>
> Observations (and questions):
>
> 1. No. of threads (captured through JMX) is almost the same as the total
> number of operators being created. This clears my original question in this
> thread.
>
> 2. Even when the number of task slots is 4, on web ui, it shows 3 slots as
> free. Is this expected? Why are the subtasks not being distributed across
> slots?
>
> 3. Job deployment hangs (never switches to RUNNING) when the number of
> devices is greater than 5. Even on increasing the akka client timeout, it
> does not help. Will separate jobs being deployed per device instead of
> separate streams help here?
>
> 4. Is there an upper limit on number task slots which can be configured? I
> know that my operator state size at any given point in time would not be
> very high, so it looks OK to deploy independent jobs which can be deployed
> on the same task manager across slots.
>
> Thanks,
> Shailesh
>
>
> On Mon, Nov 13, 2017 at 7:21 PM, Piotr Nowojski <piotr@data-artisans.com>
> wrote:
>
>> Sure, let us know if you have other questions or encounter some issues.
>>
>> Thanks, Piotrek
>>
>>
>> On 13 Nov 2017, at 14:49, Shailesh Jain <shailesh.jain@stellapps.com>
>> wrote:
>>
>> Thanks, Piotr. I'll try it out and will get back in case of any further
>> questions.
>>
>> Shailesh
>>
>> On Fri, Nov 10, 2017 at 5:52 PM, Piotr Nowojski <piotr@data-artisans.com>
>> wrote:
>>
>>> 1.  It’s a little bit more complicated then that. Each operator
>>> chain/task will be executed in separate thread (parallelism
>>>  Multiplies that). You can check in web ui how was your job split into
>>> tasks.
>>>
>>> 3. Yes that’s true, this is an issue. To preserve the individual
>>> watermarks/latencies (assuming that you have some way to calculate them
>>> individually per each device), you could either:
>>>
>>> a) have separate jobs per each device with parallelism 1. Pros:
>>> independent failures/checkpoints, Cons: resource usage (number of threads
>>> increases with number of devices, there are also other resources consumed
>>> by each job), efficiency,
>>> b) have one job with multiple data streams. Cons: resource usage
>>> (threads)
>>> c) ignore Flink’s watermarks, and implement your own code in place of
>>> it. You could read all of your data in single data stream, keyBy
>>> partition/device and manually handle watermarks logic. You could either try
>>> to wrap CEP/Window operators or copy/paste and modify them to suite your
>>> needs.
>>>
>>> I would start and try out from a). If it work for your cluster/scale
>>> then that’s fine. If not try b) (would share most of the code with a), and
>>> as a last resort try c).
>>>
>>> Kostas, would you like to add something?
>>>
>>> Piotrek
>>>
>>> On 9 Nov 2017, at 19:16, Shailesh Jain <shailesh.jain@stellapps.com>
>>> wrote:
>>>
>>> On 1. - is it tied specifically to the number of source operators or to
>>> the number of Datastream objects created. I mean does the answer change if
>>> I read all the data from a single Kafka topic, get a Datastream of all
>>> events, and the apply N filters to create N individual streams?
>>>
>>> On 3. - the problem with partitions is that watermarks cannot be
>>> different per partition, and since in this use case, each stream is from a
>>> device, the latency could be different (but order will be correct almost
>>> always) and there are high chances of loosing out on events on operators
>>> like Patterns which work with windows. Any ideas for workarounds here?
>>>
>>>
>>> Thanks,
>>> Shailesh
>>>
>>> On 09-Nov-2017 8:48 PM, "Piotr Nowojski" <piotr@data-artisans.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> 1.
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/parallel.html
>>>
>>> Number of threads executing would be roughly speaking equal to of the
>>> number of input data streams multiplied by the parallelism.
>>>
>>> 2.
>>> Yes, you could dynamically create more data streams at the job startup.
>>>
>>> 3.
>>> Running 10000 independent data streams on a small cluster (couple of
>>> nodes) will definitely be an issue, since even with parallelism set to 1,
>>> there would be quite a lot of unnecessary threads.
>>>
>>> It would be much better to treat your data as a single data input stream
>>> with multiple partitions. You could assign partitions between source
>>> instances based on parallelism. For example with parallelism 6:
>>> - source 0 could get partitions 0, 6, 12, 18
>>> - source 1, could get partitions 1, 7, …
>>> …
>>> - source 5, could get partitions 5, 11, ...
>>>
>>> Piotrek
>>>
>>> On 9 Nov 2017, at 10:18, Shailesh Jain <shailesh.jain@stellapps.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> I'm trying to understand the runtime aspect of Flink when dealing with
>>> multiple data streams and multiple operators per data stream.
>>>
>>> Use case: N data streams in a single flink job (each data stream
>>> representing 1 device - with different time latencies), and each of these
>>> data streams gets split into two streams, of which one goes into a bunch of
>>> CEP operators, and one into a process function.
>>>
>>> Questions:
>>> 1. At runtime, will the engine create one thread per data stream? Or one
>>> thread per operator?
>>> 2. Is it possible to dynamically create a data stream at runtime when
>>> the job starts? (i.e. if N is read from a file when the job starts and
>>> corresponding N streams need to be created)
>>> 3. Are there any specific performance impacts when a large number of
>>> streams (N ~ 10000) are created, as opposed to N partitions within a single
>>> stream?
>>>
>>> Are there any internal (design) documents which can help understanding
>>> the implementation details? Any references to the source will also be
>>> really helpful.
>>>
>>> Thanks in advance.
>>>
>>> Shailesh
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>
>

Mime
View raw message