flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yury Ruchin <yuri.ruc...@gmail.com>
Subject Re: Implementing tee functionality in a streaming job
Date Tue, 20 Dec 2016 15:42:36 GMT
Well, it seems I figured it out. You're right, Fabian, it works the way you
described. I wrote a simple test job:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromCollection(Seq.range(0, 100))

stream.addSink(new DiscardingSink[Int]).disableChaining()
stream.map {_ => 1}.countWindowAll(100).sum(0).print().disableChaining()


I saw that "Records received" was 100 for both the DiscardingSink and
window operators. I also noticed that "Records out" for the
fromCollection() sink was 200 - and that was the key to understanding. In
the original job I use Kafka source, and I treated its "Records out" as the
number of records consumed by it, but it's not true. The correct
interpretation should be "<Records Out> = <Records Consumed> * <Number of
successor operators>". The additional source of confusion for me was some
inaccuracy of sampled numbers - "Records In" values in successor operators
were not exactly equal which made me think they receive different portions
of the stream. I believe the inaccuracy is somewhat intrinsic to live
stream sampling, so that's fine.

2016-12-20 14:35 GMT+03:00 Yury Ruchin <yuri.ruchin@gmail.com>:

> Thanks Fabian, I will try creating a toy job illustrating the issue and
> get back.
> 2016-12-20 12:58 GMT+03:00 Fabian Hueske <fhueske@gmail.com>:
>> Hi Yury,
>> your solution should exactly solve your problem.
>> An operator sends all outgoing records to all connected successor
>> operators.
>> There should not be any non-deterministic behavior or splitting of
>> records.
>> Can you share some example code that produces the non-deterministic
>> behavior?
>> Best, Fabian
>> 2016-12-20 10:50 GMT+01:00 Yury Ruchin <yuri.ruchin@gmail.com>:
>>> Hi all,
>>> I have a streaming job that I want at some point to duplicate stream, so
>>> that one copy of it ends up in a sink, and another one goes down the
>>> processing pipeline. This way I want to implement something similar to
>>> "tee" Linux utility.
>>> I tried a naive approach that looked like this:
>>> val streamToMirror = env.addSource(mySource).<some operators here>
>>> streamToMirror.addSink(someSink) // (1) tee
>>> streamToMirror.<more operators here>.addSink(anotherSink) // (2) process
>>> further
>>> But the above results in the stream being split between the mySink (1)
>>> and further processors (2) in a seemingly nondeterministic way.
>>> I could write through Kafka as shown in this pseudocode:
>>> val headStream = env.addSource(mySource).<some operators here>
>>> headStream.addSink(KafkaSink("myTopic"))
>>> val tailStream = env.addSource(KafkaSource("myTopic")).<more operators
>>> here>
>>> But this would incur additional latency + deserialization overhead that
>>> I would like to avoid.
>>> Is there any way to implement stream teeing as described?
>>> Thanks,
>>> Yury

View raw message