flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shailesh Jain <shailesh.j...@stellapps.com>
Subject Re: Correlation between data streams/operators and threads
Date Fri, 17 Nov 2017 05:15:07 GMT
Bump.

On Wed, Nov 15, 2017 at 12:34 AM, Shailesh Jain <shailesh.jain@stellapps.com
> wrote:

> 1. Single data source because I have one kafka topic where all events get
> published. But I am creating multiple data streams by applying a series of
> filter operations on the single input stream, to generate device specific
> data stream, and then assigning the watermarks on that stream. Will this
> not result in downstream operators (for a particular device specific
> stream) to get correct device specific watermarks?
>
> Job code:
>
> // eventStream initially contains all events from all devices
>
>         for (int i = 0; i < TOTAL_DEVICES; i++) {
>             DataStream<Event> deviceOnlyEvents = eventStream.filter(new
> DeviceFilter(i))
>                     .assignTimestampsAndWatermarks(new
> EventTimeStampExtractor(Time.milliseconds(1))).setParallelism(1);
>             // apply CEP operators, and generate derived events
>             DataStream<Event> derivedEvents = PatternCreator.
> addPatternsOnStream(deviceOnlyEvents, appId);
>             // also pass the stream through a process function (this gets
> chained with the source operator as you had mentioned above)
>             DataStream<Event> stateTransitionEvents =
> deviceOnlyEvents.process(new StateMachineOperator(appId)).
> setParallelism(1);
>             // add sink to the new event streams
>             derivedEvents.union(stateTransitionEvents).addSink(kafkaSink);
>         }
>
> Comments?
>
> Thanks,
> Shailesh
>
>
> On Tue, Nov 14, 2017 at 6:57 PM, Piotr Nowojski <piotr@data-artisans.com>
> wrote:
>
>> 1. It seems like you have one single data source, not one per device.
>> That might make a difference. Single data source followed by comap might
>> create one single operator chain. If you want to go this way, please use my
>> suggested solution c), since you will have troubles with handling
>> watermarks anyway with single data source.
>>
>> 3. Nico, can you take a look at this one? Isn’t this a blob server issue?
>>
>> Piotrek
>>
>> On 14 Nov 2017, at 11:35, Shailesh Jain <shailesh.jain@stellapps.com>
>> wrote:
>>
>> 1. Okay, I understand. My code is similar to what you demonstrated. I
>> have attached a snap of my job plan visualization.
>>
>> 3. Have attached the logs and exception raised (15min - configured akka
>> timeout) after submitting the job.
>>
>> Thanks,
>> Shailesh
>>
>>
>> On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <piotr@data-artisans.com>
>> wrote:
>>
>>> Hi,
>>>
>>> 1.
>>> I’m not sure what is your code. However I have tested it and here is the
>>> example with multiple streams in one job:
>>> https://gist.github.com/pnowojski/63fb1c56f2938091769d8de6f513567f
>>> As expected it created 5 source threads (checked in the debugger) and is
>>> printing 5 values to the output every seconds, so clearly those 5 sources
>>> are executed simultaneously.
>>>
>>> Number of operators is not related to the number of threads. Number of
>>> operator chains is. Simple pipelines like source -> map -> filter ->
sink
>>> will be chained and executed in one threads, please refer to the
>>> documentation link in one of my earlier response.
>>>
>>> Can you share your job code?
>>>
>>> 2. Good point, I forgot to mention that. The job in my example will have
>>> 5 operator chains, but because of task slot sharing, they will share one
>>> single task slot. In order to distribute such job with parallelism 1 across
>>> the cluster you have to define different slot sharing groups per each chain:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/datastream_api.html#task-chaining-and-resource-groups
>>> Just set it on the sources.
>>>
>>> 3. Can you show the logs from job manager and task manager?
>>>
>>> 4. As long as you have enough heap memory to run your application/tasks
>>> there is no upper limit for number of task slots.
>>>
>>> Piotrek
>>>
>>> On 14 Nov 2017, at 07:26, Shailesh Jain <shailesh.jain@stellapps.com>
>>> wrote:
>>>
>>> Hi Piotrek,
>>>
>>> I tried out option 'a' mentioned above, but instead of separate jobs,
>>> I'm creating separate streams per device. Following is the test deployment
>>> configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):
>>>
>>> akka.client.timeout 15 min
>>> jobmanager.heap.mb 1024
>>> jobmanager.rpc.address localhost
>>> jobmanager.rpc.port 6123
>>> jobmanager.web.port 8081
>>> metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
>>> metrics.reporter.jmx.port 8789
>>> metrics.reporters jmx
>>> parallelism.default 1
>>> taskmanager.heap.mb 1024
>>> taskmanager.memory.preallocate false
>>> taskmanager.numberOfTaskSlots 4
>>>
>>> The number of Operators per device stream is 4 (one sink function, 3 CEP
>>> operators).
>>>
>>> Observations (and questions):
>>>
>>> 1. No. of threads (captured through JMX) is almost the same as the total
>>> number of operators being created. This clears my original question in this
>>> thread.
>>>
>>> 2. Even when the number of task slots is 4, on web ui, it shows 3 slots
>>> as free. Is this expected? Why are the subtasks not being distributed
>>> across slots?
>>>
>>> 3. Job deployment hangs (never switches to RUNNING) when the number of
>>> devices is greater than 5. Even on increasing the akka client timeout, it
>>> does not help. Will separate jobs being deployed per device instead of
>>> separate streams help here?
>>>
>>> 4. Is there an upper limit on number task slots which can be configured?
>>> I know that my operator state size at any given point in time would not be
>>> very high, so it looks OK to deploy independent jobs which can be deployed
>>> on the same task manager across slots.
>>>
>>> Thanks,
>>> Shailesh
>>>
>>>
>>> On Mon, Nov 13, 2017 at 7:21 PM, Piotr Nowojski <piotr@data-artisans.com
>>> > wrote:
>>>
>>>> Sure, let us know if you have other questions or encounter some issues.
>>>>
>>>> Thanks, Piotrek
>>>>
>>>>
>>>> On 13 Nov 2017, at 14:49, Shailesh Jain <shailesh.jain@stellapps.com>
>>>> wrote:
>>>>
>>>> Thanks, Piotr. I'll try it out and will get back in case of any further
>>>> questions.
>>>>
>>>> Shailesh
>>>>
>>>> On Fri, Nov 10, 2017 at 5:52 PM, Piotr Nowojski <
>>>> piotr@data-artisans.com> wrote:
>>>>
>>>>> 1.  It’s a little bit more complicated then that. Each operator
>>>>> chain/task will be executed in separate thread (parallelism
>>>>>  Multiplies that). You can check in web ui how was your job split into
>>>>> tasks.
>>>>>
>>>>> 3. Yes that’s true, this is an issue. To preserve the individual
>>>>> watermarks/latencies (assuming that you have some way to calculate them
>>>>> individually per each device), you could either:
>>>>>
>>>>> a) have separate jobs per each device with parallelism 1. Pros:
>>>>> independent failures/checkpoints, Cons: resource usage (number of threads
>>>>> increases with number of devices, there are also other resources consumed
>>>>> by each job), efficiency,
>>>>> b) have one job with multiple data streams. Cons: resource usage
>>>>> (threads)
>>>>> c) ignore Flink’s watermarks, and implement your own code in place
of
>>>>> it. You could read all of your data in single data stream, keyBy
>>>>> partition/device and manually handle watermarks logic. You could either
try
>>>>> to wrap CEP/Window operators or copy/paste and modify them to suite your
>>>>> needs.
>>>>>
>>>>> I would start and try out from a). If it work for your cluster/scale
>>>>> then that’s fine. If not try b) (would share most of the code with
a), and
>>>>> as a last resort try c).
>>>>>
>>>>> Kostas, would you like to add something?
>>>>>
>>>>> Piotrek
>>>>>
>>>>> On 9 Nov 2017, at 19:16, Shailesh Jain <shailesh.jain@stellapps.com>
>>>>> wrote:
>>>>>
>>>>> On 1. - is it tied specifically to the number of source operators or
>>>>> to the number of Datastream objects created. I mean does the answer change
>>>>> if I read all the data from a single Kafka topic, get a Datastream of
all
>>>>> events, and the apply N filters to create N individual streams?
>>>>>
>>>>> On 3. - the problem with partitions is that watermarks cannot be
>>>>> different per partition, and since in this use case, each stream is from
a
>>>>> device, the latency could be different (but order will be correct almost
>>>>> always) and there are high chances of loosing out on events on operators
>>>>> like Patterns which work with windows. Any ideas for workarounds here?
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Shailesh
>>>>>
>>>>> On 09-Nov-2017 8:48 PM, "Piotr Nowojski" <piotr@data-artisans.com>
>>>>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> 1.
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>> dev/parallel.html
>>>>>
>>>>> Number of threads executing would be roughly speaking equal to of the
>>>>> number of input data streams multiplied by the parallelism.
>>>>>
>>>>> 2.
>>>>> Yes, you could dynamically create more data streams at the job startup.
>>>>>
>>>>> 3.
>>>>> Running 10000 independent data streams on a small cluster (couple of
>>>>> nodes) will definitely be an issue, since even with parallelism set to
1,
>>>>> there would be quite a lot of unnecessary threads.
>>>>>
>>>>> It would be much better to treat your data as a single data input
>>>>> stream with multiple partitions. You could assign partitions between
source
>>>>> instances based on parallelism. For example with parallelism 6:
>>>>> - source 0 could get partitions 0, 6, 12, 18
>>>>> - source 1, could get partitions 1, 7, …
>>>>> …
>>>>> - source 5, could get partitions 5, 11, ...
>>>>>
>>>>> Piotrek
>>>>>
>>>>> On 9 Nov 2017, at 10:18, Shailesh Jain <shailesh.jain@stellapps.com>
>>>>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I'm trying to understand the runtime aspect of Flink when dealing with
>>>>> multiple data streams and multiple operators per data stream.
>>>>>
>>>>> Use case: N data streams in a single flink job (each data stream
>>>>> representing 1 device - with different time latencies), and each of these
>>>>> data streams gets split into two streams, of which one goes into a bunch
of
>>>>> CEP operators, and one into a process function.
>>>>>
>>>>> Questions:
>>>>> 1. At runtime, will the engine create one thread per data stream? Or
>>>>> one thread per operator?
>>>>> 2. Is it possible to dynamically create a data stream at runtime when
>>>>> the job starts? (i.e. if N is read from a file when the job starts and
>>>>> corresponding N streams need to be created)
>>>>> 3. Are there any specific performance impacts when a large number of
>>>>> streams (N ~ 10000) are created, as opposed to N partitions within a
single
>>>>> stream?
>>>>>
>>>>> Are there any internal (design) documents which can help understanding
>>>>> the implementation details? Any references to the source will also be
>>>>> really helpful.
>>>>>
>>>>> Thanks in advance.
>>>>>
>>>>> Shailesh
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>> <Screenshot-2017-11-14 Flink Plan Visualizer.png><flink-shailesh
>> -jobmanager-0-shailesh.log><Exception>
>>
>>
>>
>

Mime
View raw message