flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rss rss <rssde...@gmail.com>
Subject Re: finite subset of an infinite data stream
Date Sun, 22 Nov 2015 19:52:56 GMT
Hello,

  I have prepared a prototype of the batch subsystem that described in the
previous mail.
https://github.com/rssdev10/flink-experiments/tree/init_by_kafka_sink_kafka
It does not contain correct Kafka's serialization/deserialization because I
didn't see how to do it yet. But it contains a code for running Flink batch
processing by a message from Kafka queue -
https://github.com/rssdev10/flink-experiments/blob/init_by_kafka_sink_kafka/src/main/java/proto/flink/batch/Consumer.java
. This code is based on Kafka’s examples.

  The question is following. Is it correct implementation of the Flink
batch API activation? See method Consumer::run.

  Also I added a sink for Kafka -
https://github.com/rssdev10/flink-experiments/blob/init_by_kafka_sink_kafka/src/main/java/proto/flink/batch/KafkaOutputFormat.java
Is it correct? Unfortunately the Flink's documentation does not contain
examples of a custom sink implementation
https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#data-sinks
.

  The program works at least in the mode of one queue looped by self
consumer-producer and activated by the only message from Kafka's console.

Best regards,
Roman

2015-11-20 16:05 GMT+03:00 rss rss <rssdev10@gmail.com>:

> Hello Aljoscha,
>
>   Thanks, I looked your code. I think, It is really useful for getting
> real time data from some sensors. And as a simple example it may be
> considered in a modern Internet of Thing context. E.g. there are some
> temperature sensor or sensor of water flow; and I want to build simple
> application when the data flow from the sensors is saved to persistent
> storage but a small real time buffer I want to use for visualizing on a
> display by a query.
>
>   But at the same time my question have a second part. I need to link the
> real time data with data from persistent storage. And I don't see how your
> example may help in this. Let the input query contains some data fetch
> condition. In this case we have to build a separate DataSet or DataStream
> to a persistent storage with specified conditions. It may be SQL or simple
> map(...).filter("something"). But main obstacle is how to configure new
> data processing schema been inside the current stream transformation. E.g.
> being inside the connected query stream map function.
>
>   Week ago I have prepared other schema of my task solving with separation
> of streaming and batch subsystems. See the attached image. It may be
> changed accordingly your example but I don't see other way to resolve the
> task than separate queries to persistent storage in batch part.
>
> [image: Встроенное изображение 1]
>
>   And note, this schema describes an idea about how to emulate a real time
> buffer by means of Kafka. Windowed stream infinitely produces data
> sequences and sinks ones into an external queue with limited storing time
> or without storing in whole. Any consumers connected to the queue are
> received an actual data. I don't like this idea because it is excess
> network communication but it looks workable.
>
>   BTW: it is something like lambda/kappa architecture implementation. I
> don't like these terms but actually it is.
>
> Best regards,
> Roman
>
>
> 2015-11-20 13:26 GMT+04:00 Aljoscha Krettek <aljoscha@apache.org>:
>
>> Hi,
>> I’m very sorry, yes you would need my custom branch:
>> https://github.com/aljoscha/flink/commits/state-enhance
>>
>> Cheers,
>> Aljoscha
>> > On 20 Nov 2015, at 10:13, rss rss <rssdev10@gmail.com> wrote:
>> >
>> > Hello Aljoscha,
>> >
>> >   very thanks. I tried to build your example but have an obstacle with
>> org.apache.flink.runtime.state.AbstractStateBackend class. Where to get it?
>> I guess it stored in your local branch only. Would you please to send me
>> patches for public branch or share the branch with me?
>> >
>> > Best regards,
>> > Roman
>> >
>> >
>> > 2015-11-18 17:24 GMT+04:00 Aljoscha Krettek <aljoscha@apache.org>:
>> > Hi,
>> > I wrote a little example that could be what you are looking for:
>> https://github.com/dataArtisans/query-window-example
>> >
>> > It basically implements a window operator with a modifiable window size
>> that also allows querying the current accumulated window contents using a
>> second input stream.
>> >
>> > There is a README file in the github repository, but please let me know
>> if you need further explanations.
>> >
>> > Cheers,
>> > Aljoscha
>> >
>> > > On 18 Nov 2015, at 12:02, Robert Metzger <rmetzger@apache.org> wrote:
>> > >
>> > > Hi Roman,
>> > >
>> > > I've updated the documentation. It seems that it got out of sync.
>> Thank you for notifying us about this.
>> > >
>> > > My colleague Aljoscha has some experimental code that is probably
>> doing what you are looking for: A standing window (your RT-buffer) that you
>> can query using a secondary stream (your user's queries).
>> > > He'll post the code soon to this email thread.
>> > >
>> > > Regards,
>> > > Robert
>> > >
>> > >
>> > > On Wed, Nov 11, 2015 at 2:51 PM, rss rss <rssdev10@gmail.com> wrote:
>> > > Hello,
>> > >
>> > >   thanks, Stephan, but triggers are not that I searched. And BTW, the
>> documentation is obsolete. There is no Count class now. I found
>> CountTrigger only.
>> > >
>> > >   Thanks Robert, your example may be useful for me but in some other
>> point. I mentioned "union" as an ordinary union of similar data. It is the
>> same as "union" in the datastream documentation.
>> > >
>> > >   The task is very simple. We have an infinite stream of data from
>> sensors, billing system etc. There is no matter what it is but it is
>> infinite. We have to store the data in any persistent storage to be able to
>> make analytical queries later. And there is a stream of user's analytical
>> queries. But the stream of input data is big and time of saving in the
>> persistent storage is big too. And we have not a very fast bigdata OLTP
>> storage. That is the data extracted from the persistent storage by the
>> user's requests probably will not contain actual data. We have to have some
>> real time buffer (RT-Buffer in the schema) with actual data and have to
>> union it with the data processing results from persistent storage (I don't
>> speak about data deduplication and ordering now.). And of course the user's
>> query are unpredictable regarding data filtering conditions.
>> > >
>> > >   The attached schema is attempt to understand how it may be
>> implemented with Flink. I tried to imagine how to implement it by Flink's
>> streaming API but found obstacles. This schema is not first variant. It
>> contains separated driver program to configure new jobs by user's queries.
>> The reason I not found a way how to link the stream of user's queries with
>> further data processing. But it is some near to
>> https://gist.github.com/fhueske/4ea5422edb5820915fa4
>> > >
>> > >
>> > > <flink_streams.png>
>> > >
>> > >   The main question is how to process each user's query combining it
>> with actual data from the real time buffer and batch request to the
>> persistent storage. Unfortunately I not found a decision in Streaming API
>> only.
>> > >
>> > > Regards,
>> > > Roman
>> > >
>> > > 2015-11-11 15:45 GMT+04:00 Robert Metzger <rmetzger@apache.org>:
>> > > I think what you call "union" is a "connected stream" in Flink. Have
>> a look at this example:
>> https://gist.github.com/fhueske/4ea5422edb5820915fa4
>> > > It shows how to dynamically update a list of filters by external
>> requests.
>> > > Maybe that's what you are looking for?
>> > >
>> > >
>> > >
>> > > On Wed, Nov 11, 2015 at 12:15 PM, Stephan Ewen <sewen@apache.org>
>> wrote:
>> > > Hi!
>> > >
>> > > I don not really understand what exactly you want to do, especially
>> the "union an infinite real time data stream with filtered persistent data
>> where the condition of filtering is provided by external requests".
>> > >
>> > > If you want to work on substreams in general, there are two options:
>> > >
>> > > 1) Create the substream in a streaming window. You can "cut" the
>> stream based on special records/events that signal that the subsequence is
>> done. Have a look at the "Trigger" class for windows, it can react to
>> elements and their contents:
>> > >
>> > >
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#windows-on-keyed-data-streams
>> (secion on Advanced Windowing).
>> > >
>> > >
>> > > 2) You can trigger sequences of batch jobs. The batch job data source
>> (input format) can decide when to stop consuming the stream, at which point
>> the remainder of the transformations run, and the batch job finishes.
>> > > You can already run new transformation chains after each call to
>> "env.execute()", once the execution finished, to implement the sequence of
>> batch jobs.
>> > >
>> > >
>> > > I would try and go for the windowing solution if that works, because
>> that will give you better fault tolerance / high availability. In the
>> repeated batch jobs case, you need to worry yourself about what happens
>> when the driver program (that calls env.execute()) fails.
>> > >
>> > >
>> > > Hope that helps...
>> > >
>> > > Greetings,
>> > > Stephan
>> > >
>> > >
>> > >
>> > > On Mon, Nov 9, 2015 at 1:24 PM, rss rss <rssdev10@gmail.com> wrote:
>> > > Hello,
>> > >
>> > >   thanks for the answer but windows produce periodical results. I
>> used your example but the data source is changed to TCP stream:
>> > >
>> > >         DataStream<String> text = env.socketTextStream("localhost",
>> 2015, '\n');
>> > >         DataStream<Tuple2<String, Integer>> wordCounts =
>> > >                 text
>> > >                 .flatMap(new LineSplitter())
>> > >                 .keyBy(0)
>> > >                 .timeWindow(Time.of(5, TimeUnit.SECONDS))
>> > >                 .sum(1);
>> > >
>> > >         wordCounts.print();
>> > >         env.execute("WordCount Example");
>> > >
>> > >  I see an infinite results printing instead of the only list.
>> > >
>> > >  The data source is following script:
>> > > -----------------------------------------------------
>> > > #!/usr/bin/env ruby
>> > >
>> > > require 'socket'
>> > >
>> > > server = TCPServer.new 2015
>> > > loop do
>> > >   Thread.start(server.accept) do |client|
>> > >     puts Time.now.to_s + ': New client!'
>> > >     loop do
>> > >       client.puts "#{Time.now} #{[*('A'..'Z')].sample(3).join}"
>> > >       sleep rand(1000)/1000.0
>> > >     end
>> > >     client.close
>> > >   end
>> > > end
>> > > -----------------------------------------------------
>> > >
>> > >   My purpose is to union an infinite real time data stream with
>> filtered persistent data where the condition of filtering is provided by
>> external requests. And the only result of union is interested. In this case
>> I guess I need a way to terminate the stream. May be I wrong.
>> > >
>> > >   Moreover it should be possible to link the streams by next request
>> with other filtering criteria. That is create new data transformation chain
>> after running of env.execute("WordCount Example"). Is it possible now? If
>> not, is it possible with minimal changes of the core of Flink?
>> > >
>> > > Regards,
>> > > Roman
>> > >
>> > > 2015-11-09 12:34 GMT+04:00 Stephan Ewen <sewen@apache.org>:
>> > > Hi!
>> > >
>> > > If you want to work on subsets of streams, the answer is usually to
>> use windows, "stream.keyBy(...).timeWindow(Time.of(1, MINUTE))".
>> > >
>> > > The transformations that you want to make, do they fit into a window
>> function?
>> > >
>> > > There are thoughts to introduce something like global time windows
>> across the entire stream, inside which you can work more in a batch-style,
>> but that is quite an extensive change to the core.
>> > >
>> > > Greetings,
>> > > Stephan
>> > >
>> > >
>> > > On Sun, Nov 8, 2015 at 5:15 PM, rss rss <rssdev10@gmail.com> wrote:
>> > > Hello,
>> > >
>> > >
>> > > I need to extract a finite subset like a data buffer from an infinite
>> data stream. The best way for me is to obtain a finite stream with data
>> accumulated for a 1minute before (as example). But I not found any existing
>> technique to do it.
>> > >
>> > >
>> > > As a possible ways how to do something near to a stream’s subset I
>> see following cases:
>> > >
>> > > -          some transformation operation like ‘take_while’ that
>> produces new stream but able to switch one to FINISHED state. Unfortunately
>> I not found how to switch the state of a stream from a user code of
>> transformation functions;
>> > >
>> > > -          new DataStream or StreamSource constructors which allow to
>> connect a data processing chain to the source stream. It may be something
>> like mentioned take_while transform function or modified StreamSource.run
>> method with data from the source stream.
>> > >
>> > >
>> > > That is I have two questions.
>> > >
>> > > 1)      Is there any technique to extract accumulated data from a
>> stream as a stream (to union it with another stream)? This is like pure
>> buffer mode.
>> > >
>> > > 2)      If the answer to first question is negative, is there
>> something like take_while transformation or should I think about custom
>> implementation of it? Is it possible to implement it without modification
>> of the core of Flink?
>> > >
>> > >
>> > > Regards,
>> > >
>> > > Roman
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> >
>> >
>>
>>
>

Mime
View raw message