flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: finite subset of an infinite data stream
Date Wed, 25 Nov 2015 17:51:27 GMT
Hi Roman,

I've looked through your code here:
https://github.com/rssdev10/flink-experiments/blob/init_by_kafka_sink_kafka/src/main/java/proto/flink/batch/Consumer.java

The implementation is not very efficient. Flink is not like spark where the
job's driver is constantly interacting with the program running in the
cluster.
Flink will generate a plan from your code and submit it to the cluster [1].
Your code is submitting a new plan for every message in Kafka into the
cluster.
It will be faster to process the data locally.

I still think that you can use the DataStream API of Flink. Maybe use this
example as a starting point:

Properties properties = new
Properties();properties.setProperty("bootstrap.servers",
"localhost:9092");properties.setProperty("zookeeper.connect",
"localhost:2181");properties.setProperty("group.id",
"test");DataStream<String> stream = env
	.addSource(new FlinkKafkaConsumer082<>("topic", new
SimpleStringSchema(), properties))


then, use the "stream" object to perform your transformations.

The output format looks good!


[1]
https://ci.apache.org/projects/flink/flink-docs-master/internals/general_arch.html


On Sun, Nov 22, 2015 at 8:52 PM, rss rss <rssdev10@gmail.com> wrote:

> Hello,
>
>   I have prepared a prototype of the batch subsystem that described in the
> previous mail.
> https://github.com/rssdev10/flink-experiments/tree/init_by_kafka_sink_kafka
> It does not contain correct Kafka's serialization/deserialization because I
> didn't see how to do it yet. But it contains a code for running Flink batch
> processing by a message from Kafka queue -
> https://github.com/rssdev10/flink-experiments/blob/init_by_kafka_sink_kafka/src/main/java/proto/flink/batch/Consumer.java
> . This code is based on Kafka’s examples.
>
>   The question is following. Is it correct implementation of the Flink
> batch API activation? See method Consumer::run.
>
>   Also I added a sink for Kafka -
> https://github.com/rssdev10/flink-experiments/blob/init_by_kafka_sink_kafka/src/main/java/proto/flink/batch/KafkaOutputFormat.java
> Is it correct? Unfortunately the Flink's documentation does not contain
> examples of a custom sink implementation
> https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#data-sinks
> .
>
>   The program works at least in the mode of one queue looped by self
> consumer-producer and activated by the only message from Kafka's console.
>
> Best regards,
> Roman
>
> 2015-11-20 16:05 GMT+03:00 rss rss <rssdev10@gmail.com>:
>
>> Hello Aljoscha,
>>
>>   Thanks, I looked your code. I think, It is really useful for getting
>> real time data from some sensors. And as a simple example it may be
>> considered in a modern Internet of Thing context. E.g. there are some
>> temperature sensor or sensor of water flow; and I want to build simple
>> application when the data flow from the sensors is saved to persistent
>> storage but a small real time buffer I want to use for visualizing on a
>> display by a query.
>>
>>   But at the same time my question have a second part. I need to link the
>> real time data with data from persistent storage. And I don't see how your
>> example may help in this. Let the input query contains some data fetch
>> condition. In this case we have to build a separate DataSet or DataStream
>> to a persistent storage with specified conditions. It may be SQL or simple
>> map(...).filter("something"). But main obstacle is how to configure new
>> data processing schema been inside the current stream transformation. E.g.
>> being inside the connected query stream map function.
>>
>>   Week ago I have prepared other schema of my task solving with
>> separation of streaming and batch subsystems. See the attached image. It
>> may be changed accordingly your example but I don't see other way to
>> resolve the task than separate queries to persistent storage in batch part.
>>
>> [image: Встроенное изображение 1]
>>
>>   And note, this schema describes an idea about how to emulate a real
>> time buffer by means of Kafka. Windowed stream infinitely produces data
>> sequences and sinks ones into an external queue with limited storing time
>> or without storing in whole. Any consumers connected to the queue are
>> received an actual data. I don't like this idea because it is excess
>> network communication but it looks workable.
>>
>>   BTW: it is something like lambda/kappa architecture implementation. I
>> don't like these terms but actually it is.
>>
>> Best regards,
>> Roman
>>
>>
>> 2015-11-20 13:26 GMT+04:00 Aljoscha Krettek <aljoscha@apache.org>:
>>
>>> Hi,
>>> I’m very sorry, yes you would need my custom branch:
>>> https://github.com/aljoscha/flink/commits/state-enhance
>>>
>>> Cheers,
>>> Aljoscha
>>> > On 20 Nov 2015, at 10:13, rss rss <rssdev10@gmail.com> wrote:
>>> >
>>> > Hello Aljoscha,
>>> >
>>> >   very thanks. I tried to build your example but have an obstacle with
>>> org.apache.flink.runtime.state.AbstractStateBackend class. Where to get it?
>>> I guess it stored in your local branch only. Would you please to send me
>>> patches for public branch or share the branch with me?
>>> >
>>> > Best regards,
>>> > Roman
>>> >
>>> >
>>> > 2015-11-18 17:24 GMT+04:00 Aljoscha Krettek <aljoscha@apache.org>:
>>> > Hi,
>>> > I wrote a little example that could be what you are looking for:
>>> https://github.com/dataArtisans/query-window-example
>>> >
>>> > It basically implements a window operator with a modifiable window
>>> size that also allows querying the current accumulated window contents
>>> using a second input stream.
>>> >
>>> > There is a README file in the github repository, but please let me
>>> know if you need further explanations.
>>> >
>>> > Cheers,
>>> > Aljoscha
>>> >
>>> > > On 18 Nov 2015, at 12:02, Robert Metzger <rmetzger@apache.org>
>>> wrote:
>>> > >
>>> > > Hi Roman,
>>> > >
>>> > > I've updated the documentation. It seems that it got out of sync.
>>> Thank you for notifying us about this.
>>> > >
>>> > > My colleague Aljoscha has some experimental code that is probably
>>> doing what you are looking for: A standing window (your RT-buffer) that you
>>> can query using a secondary stream (your user's queries).
>>> > > He'll post the code soon to this email thread.
>>> > >
>>> > > Regards,
>>> > > Robert
>>> > >
>>> > >
>>> > > On Wed, Nov 11, 2015 at 2:51 PM, rss rss <rssdev10@gmail.com>
wrote:
>>> > > Hello,
>>> > >
>>> > >   thanks, Stephan, but triggers are not that I searched. And BTW,
>>> the documentation is obsolete. There is no Count class now. I found
>>> CountTrigger only.
>>> > >
>>> > >   Thanks Robert, your example may be useful for me but in some other
>>> point. I mentioned "union" as an ordinary union of similar data. It is the
>>> same as "union" in the datastream documentation.
>>> > >
>>> > >   The task is very simple. We have an infinite stream of data from
>>> sensors, billing system etc. There is no matter what it is but it is
>>> infinite. We have to store the data in any persistent storage to be able to
>>> make analytical queries later. And there is a stream of user's analytical
>>> queries. But the stream of input data is big and time of saving in the
>>> persistent storage is big too. And we have not a very fast bigdata OLTP
>>> storage. That is the data extracted from the persistent storage by the
>>> user's requests probably will not contain actual data. We have to have some
>>> real time buffer (RT-Buffer in the schema) with actual data and have to
>>> union it with the data processing results from persistent storage (I don't
>>> speak about data deduplication and ordering now.). And of course the user's
>>> query are unpredictable regarding data filtering conditions.
>>> > >
>>> > >   The attached schema is attempt to understand how it may be
>>> implemented with Flink. I tried to imagine how to implement it by Flink's
>>> streaming API but found obstacles. This schema is not first variant. It
>>> contains separated driver program to configure new jobs by user's queries.
>>> The reason I not found a way how to link the stream of user's queries with
>>> further data processing. But it is some near to
>>> https://gist.github.com/fhueske/4ea5422edb5820915fa4
>>> > >
>>> > >
>>> > > <flink_streams.png>
>>> > >
>>> > >   The main question is how to process each user's query combining it
>>> with actual data from the real time buffer and batch request to the
>>> persistent storage. Unfortunately I not found a decision in Streaming API
>>> only.
>>> > >
>>> > > Regards,
>>> > > Roman
>>> > >
>>> > > 2015-11-11 15:45 GMT+04:00 Robert Metzger <rmetzger@apache.org>:
>>> > > I think what you call "union" is a "connected stream" in Flink. Have
>>> a look at this example:
>>> https://gist.github.com/fhueske/4ea5422edb5820915fa4
>>> > > It shows how to dynamically update a list of filters by external
>>> requests.
>>> > > Maybe that's what you are looking for?
>>> > >
>>> > >
>>> > >
>>> > > On Wed, Nov 11, 2015 at 12:15 PM, Stephan Ewen <sewen@apache.org>
>>> wrote:
>>> > > Hi!
>>> > >
>>> > > I don not really understand what exactly you want to do, especially
>>> the "union an infinite real time data stream with filtered persistent data
>>> where the condition of filtering is provided by external requests".
>>> > >
>>> > > If you want to work on substreams in general, there are two options:
>>> > >
>>> > > 1) Create the substream in a streaming window. You can "cut" the
>>> stream based on special records/events that signal that the subsequence is
>>> done. Have a look at the "Trigger" class for windows, it can react to
>>> elements and their contents:
>>> > >
>>> > >
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#windows-on-keyed-data-streams
>>> (secion on Advanced Windowing).
>>> > >
>>> > >
>>> > > 2) You can trigger sequences of batch jobs. The batch job data
>>> source (input format) can decide when to stop consuming the stream, at
>>> which point the remainder of the transformations run, and the batch job
>>> finishes.
>>> > > You can already run new transformation chains after each call to
>>> "env.execute()", once the execution finished, to implement the sequence of
>>> batch jobs.
>>> > >
>>> > >
>>> > > I would try and go for the windowing solution if that works, because
>>> that will give you better fault tolerance / high availability. In the
>>> repeated batch jobs case, you need to worry yourself about what happens
>>> when the driver program (that calls env.execute()) fails.
>>> > >
>>> > >
>>> > > Hope that helps...
>>> > >
>>> > > Greetings,
>>> > > Stephan
>>> > >
>>> > >
>>> > >
>>> > > On Mon, Nov 9, 2015 at 1:24 PM, rss rss <rssdev10@gmail.com>
wrote:
>>> > > Hello,
>>> > >
>>> > >   thanks for the answer but windows produce periodical results. I
>>> used your example but the data source is changed to TCP stream:
>>> > >
>>> > >         DataStream<String> text = env.socketTextStream("localhost",
>>> 2015, '\n');
>>> > >         DataStream<Tuple2<String, Integer>> wordCounts
=
>>> > >                 text
>>> > >                 .flatMap(new LineSplitter())
>>> > >                 .keyBy(0)
>>> > >                 .timeWindow(Time.of(5, TimeUnit.SECONDS))
>>> > >                 .sum(1);
>>> > >
>>> > >         wordCounts.print();
>>> > >         env.execute("WordCount Example");
>>> > >
>>> > >  I see an infinite results printing instead of the only list.
>>> > >
>>> > >  The data source is following script:
>>> > > -----------------------------------------------------
>>> > > #!/usr/bin/env ruby
>>> > >
>>> > > require 'socket'
>>> > >
>>> > > server = TCPServer.new 2015
>>> > > loop do
>>> > >   Thread.start(server.accept) do |client|
>>> > >     puts Time.now.to_s + ': New client!'
>>> > >     loop do
>>> > >       client.puts "#{Time.now} #{[*('A'..'Z')].sample(3).join}"
>>> > >       sleep rand(1000)/1000.0
>>> > >     end
>>> > >     client.close
>>> > >   end
>>> > > end
>>> > > -----------------------------------------------------
>>> > >
>>> > >   My purpose is to union an infinite real time data stream with
>>> filtered persistent data where the condition of filtering is provided by
>>> external requests. And the only result of union is interested. In this case
>>> I guess I need a way to terminate the stream. May be I wrong.
>>> > >
>>> > >   Moreover it should be possible to link the streams by next request
>>> with other filtering criteria. That is create new data transformation chain
>>> after running of env.execute("WordCount Example"). Is it possible now? If
>>> not, is it possible with minimal changes of the core of Flink?
>>> > >
>>> > > Regards,
>>> > > Roman
>>> > >
>>> > > 2015-11-09 12:34 GMT+04:00 Stephan Ewen <sewen@apache.org>:
>>> > > Hi!
>>> > >
>>> > > If you want to work on subsets of streams, the answer is usually to
>>> use windows, "stream.keyBy(...).timeWindow(Time.of(1, MINUTE))".
>>> > >
>>> > > The transformations that you want to make, do they fit into a window
>>> function?
>>> > >
>>> > > There are thoughts to introduce something like global time windows
>>> across the entire stream, inside which you can work more in a batch-style,
>>> but that is quite an extensive change to the core.
>>> > >
>>> > > Greetings,
>>> > > Stephan
>>> > >
>>> > >
>>> > > On Sun, Nov 8, 2015 at 5:15 PM, rss rss <rssdev10@gmail.com>
wrote:
>>> > > Hello,
>>> > >
>>> > >
>>> > > I need to extract a finite subset like a data buffer from an
>>> infinite data stream. The best way for me is to obtain a finite stream with
>>> data accumulated for a 1minute before (as example). But I not found any
>>> existing technique to do it.
>>> > >
>>> > >
>>> > > As a possible ways how to do something near to a stream’s subset
I
>>> see following cases:
>>> > >
>>> > > -          some transformation operation like ‘take_while’ that
>>> produces new stream but able to switch one to FINISHED state. Unfortunately
>>> I not found how to switch the state of a stream from a user code of
>>> transformation functions;
>>> > >
>>> > > -          new DataStream or StreamSource constructors which allow
>>> to connect a data processing chain to the source stream. It may be
>>> something like mentioned take_while transform function or modified
>>> StreamSource.run method with data from the source stream.
>>> > >
>>> > >
>>> > > That is I have two questions.
>>> > >
>>> > > 1)      Is there any technique to extract accumulated data from a
>>> stream as a stream (to union it with another stream)? This is like pure
>>> buffer mode.
>>> > >
>>> > > 2)      If the answer to first question is negative, is there
>>> something like take_while transformation or should I think about custom
>>> implementation of it? Is it possible to implement it without modification
>>> of the core of Flink?
>>> > >
>>> > >
>>> > > Regards,
>>> > >
>>> > > Roman
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> >
>>> >
>>>
>>>
>>
>

Mime
View raw message