flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: finite subset of an infinite data stream
Date Wed, 11 Nov 2015 11:45:30 GMT
I think what you call "union" is a "connected stream" in Flink. Have a look
at this example: https://gist.github.com/fhueske/4ea5422edb5820915fa4
It shows how to dynamically update a list of filters by external requests.
Maybe that's what you are looking for?



On Wed, Nov 11, 2015 at 12:15 PM, Stephan Ewen <sewen@apache.org> wrote:

> Hi!
>
> I don not really understand what exactly you want to do, especially the "union
> an infinite real time data stream with filtered persistent data where the
> condition of filtering is provided by external requests".
>
> If you want to work on substreams in general, there are two options:
>
> 1) Create the substream in a streaming window. You can "cut" the stream
> based on special records/events that signal that the subsequence is done.
> Have a look at the "Trigger" class for windows, it can react to elements
> and their contents:
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#windows-on-keyed-data-streams
> (secion on Advanced Windowing).
>
>
> 2) You can trigger sequences of batch jobs. The batch job data source
> (input format) can decide when to stop consuming the stream, at which point
> the remainder of the transformations run, and the batch job finishes.
> You can already run new transformation chains after each call to
> "env.execute()", once the execution finished, to implement the sequence of
> batch jobs.
>
>
> I would try and go for the windowing solution if that works, because that
> will give you better fault tolerance / high availability. In the repeated
> batch jobs case, you need to worry yourself about what happens when the
> driver program (that calls env.execute()) fails.
>
>
> Hope that helps...
>
> Greetings,
> Stephan
>
>
>
> On Mon, Nov 9, 2015 at 1:24 PM, rss rss <rssdev10@gmail.com> wrote:
>
>> Hello,
>>
>>   thanks for the answer but windows produce periodical results. I used
>> your example but the data source is changed to TCP stream:
>>
>>         DataStream<String> text = env.socketTextStream("localhost", 2015,
>> '\n');
>>         DataStream<Tuple2<String, Integer>> wordCounts =
>>                 text
>>                 .flatMap(new LineSplitter())
>>                 .keyBy(0)
>>                 .timeWindow(Time.of(5, TimeUnit.SECONDS))
>>                 .sum(1);
>>
>>         wordCounts.print();
>>         env.execute("WordCount Example");
>>
>>  I see an infinite results printing instead of the only list.
>>
>>  The data source is following script:
>> -----------------------------------------------------
>> #!/usr/bin/env ruby
>>
>> require 'socket'
>>
>> server = TCPServer.new 2015
>> loop do
>>   Thread.start(server.accept) do |client|
>>     puts Time.now.to_s + ': New client!'
>>     loop do
>>       client.puts "#{Time.now} #{[*('A'..'Z')].sample(3).join}"
>>       sleep rand(1000)/1000.0
>>     end
>>     client.close
>>   end
>> end
>> -----------------------------------------------------
>>
>>   My purpose is to union an infinite real time data stream with filtered
>> persistent data where the condition of filtering is provided by external
>> requests. And the only result of union is interested. In this case I guess
>> I need a way to terminate the stream. May be I wrong.
>>
>>   Moreover it should be possible to link the streams by next request with
>> other filtering criteria. That is create new data transformation chain
>> after running of env.execute("WordCount Example"). Is it possible now? If
>> not, is it possible with minimal changes of the core of Flink?
>>
>> Regards,
>> Roman
>>
>> 2015-11-09 12:34 GMT+04:00 Stephan Ewen <sewen@apache.org>:
>>
>>> Hi!
>>>
>>> If you want to work on subsets of streams, the answer is usually to use
>>> windows, "stream.keyBy(...).timeWindow(Time.of(1, MINUTE))".
>>>
>>> The transformations that you want to make, do they fit into a window
>>> function?
>>>
>>> There are thoughts to introduce something like global time windows
>>> across the entire stream, inside which you can work more in a batch-style,
>>> but that is quite an extensive change to the core.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Sun, Nov 8, 2015 at 5:15 PM, rss rss <rssdev10@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>>
>>>>
>>>> I need to extract a finite subset like a data buffer from an infinite
>>>> data stream. The best way for me is to obtain a finite stream with data
>>>> accumulated for a 1minute before (as example). But I not found any existing
>>>> technique to do it.
>>>>
>>>>
>>>>
>>>> As a possible ways how to do something near to a stream’s subset I see
>>>> following cases:
>>>>
>>>> -          some transformation operation like ‘take_while’ that
>>>> produces new stream but able to switch one to FINISHED state. Unfortunately
>>>> I not found how to switch the state of a stream from a user code of
>>>> transformation functions;
>>>>
>>>> -          new DataStream or StreamSource constructors which allow to
>>>> connect a data processing chain to the source stream. It may be something
>>>> like mentioned take_while transform function or modified StreamSource.run
>>>> method with data from the source stream.
>>>>
>>>>
>>>>
>>>> That is I have two questions.
>>>>
>>>> 1)      Is there any technique to extract accumulated data from a
>>>> stream as a stream (to union it with another stream)? This is like pure
>>>> buffer mode.
>>>>
>>>> 2)      If the answer to first question is negative, is there
>>>> something like take_while transformation or should I think about custom
>>>> implementation of it? Is it possible to implement it without modification
>>>> of the core of Flink?
>>>>
>>>>
>>>>
>>>> Regards,
>>>>
>>>> Roman
>>>>
>>>
>>>
>>
>

Mime
View raw message