flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rss rss <rssde...@gmail.com>
Subject Re: finite subset of an infinite data stream
Date Mon, 09 Nov 2015 12:24:07 GMT
Hello,

  thanks for the answer but windows produce periodical results. I used your
example but the data source is changed to TCP stream:

        DataStream<String> text = env.socketTextStream("localhost", 2015,
'\n');
        DataStream<Tuple2<String, Integer>> wordCounts =
                text
                .flatMap(new LineSplitter())
                .keyBy(0)
                .timeWindow(Time.of(5, TimeUnit.SECONDS))
                .sum(1);

        wordCounts.print();
        env.execute("WordCount Example");

 I see an infinite results printing instead of the only list.

 The data source is following script:
-----------------------------------------------------
#!/usr/bin/env ruby

require 'socket'

server = TCPServer.new 2015
loop do
  Thread.start(server.accept) do |client|
    puts Time.now.to_s + ': New client!'
    loop do
      client.puts "#{Time.now} #{[*('A'..'Z')].sample(3).join}"
      sleep rand(1000)/1000.0
    end
    client.close
  end
end
-----------------------------------------------------

  My purpose is to union an infinite real time data stream with filtered
persistent data where the condition of filtering is provided by external
requests. And the only result of union is interested. In this case I guess
I need a way to terminate the stream. May be I wrong.

  Moreover it should be possible to link the streams by next request with
other filtering criteria. That is create new data transformation chain
after running of env.execute("WordCount Example"). Is it possible now? If
not, is it possible with minimal changes of the core of Flink?

Regards,
Roman

2015-11-09 12:34 GMT+04:00 Stephan Ewen <sewen@apache.org>:

> Hi!
>
> If you want to work on subsets of streams, the answer is usually to use
> windows, "stream.keyBy(...).timeWindow(Time.of(1, MINUTE))".
>
> The transformations that you want to make, do they fit into a window
> function?
>
> There are thoughts to introduce something like global time windows across
> the entire stream, inside which you can work more in a batch-style, but
> that is quite an extensive change to the core.
>
> Greetings,
> Stephan
>
>
> On Sun, Nov 8, 2015 at 5:15 PM, rss rss <rssdev10@gmail.com> wrote:
>
>> Hello,
>>
>>
>>
>> I need to extract a finite subset like a data buffer from an infinite
>> data stream. The best way for me is to obtain a finite stream with data
>> accumulated for a 1minute before (as example). But I not found any existing
>> technique to do it.
>>
>>
>>
>> As a possible ways how to do something near to a stream’s subset I see
>> following cases:
>>
>> -          some transformation operation like ‘take_while’ that produces
>> new stream but able to switch one to FINISHED state. Unfortunately I not
>> found how to switch the state of a stream from a user code of
>> transformation functions;
>>
>> -          new DataStream or StreamSource constructors which allow to
>> connect a data processing chain to the source stream. It may be something
>> like mentioned take_while transform function or modified StreamSource.run
>> method with data from the source stream.
>>
>>
>>
>> That is I have two questions.
>>
>> 1)      Is there any technique to extract accumulated data from a stream
>> as a stream (to union it with another stream)? This is like pure buffer
>> mode.
>>
>> 2)      If the answer to first question is negative, is there something
>> like take_while transformation or should I think about custom
>> implementation of it? Is it possible to implement it without modification
>> of the core of Flink?
>>
>>
>>
>> Regards,
>>
>> Roman
>>
>
>

Mime
View raw message