flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rss rss <rssde...@gmail.com>
Subject Re: finite subset of an infinite data stream
Date Thu, 26 Nov 2015 10:06:47 GMT
Hello Robert,

  thank you for the answer.

  I understand the idea of stream usage but I don't understand how to use
it in my task. Aljoscha wrote an example of data parts extraction by
external queries. It is very useful but not enough.

  I have conditional queries. Simple example 'get data from the specified
period' by a specified client (or sensor, or something other)'. Flink
streaming API allows to configure a stream to access to a persistent
storage. E.g. some DBMS with SQL interface. In this case I have to
configure the stream with SQL query in a constructor like 'select * from
data where timestamp > now() - 1day AND clientId == id'. But '1 day' and
'id' are parameters from the input query. I don't able to configure all
possible steams to produce all possible data combinations. Therefore I
decided to use batch part for running a transformation process for each
input query with a list of conditions. If it possible I will glad to use
streaming API only. But there are no ideas other than batch API usage.

Best regards,
Roman


2015-11-25 21:51 GMT+04:00 Robert Metzger <rmetzger@apache.org>:

> Hi Roman,
>
> I've looked through your code here:
> https://github.com/rssdev10/flink-experiments/blob/init_by_kafka_sink_kafka/src/main/java/proto/flink/batch/Consumer.java
>
> The implementation is not very efficient. Flink is not like spark where
> the job's driver is constantly interacting with the program running in the
> cluster.
> Flink will generate a plan from your code and submit it to the cluster
> [1]. Your code is submitting a new plan for every message in Kafka into the
> cluster.
> It will be faster to process the data locally.
>
> I still think that you can use the DataStream API of Flink. Maybe use this
> example as a starting point:
>
> Properties properties = new Properties();properties.setProperty("bootstrap.servers",
"localhost:9092");properties.setProperty("zookeeper.connect", "localhost:2181");properties.setProperty("group.id",
"test");DataStream<String> stream = env
> 	.addSource(new FlinkKafkaConsumer082<>("topic", new SimpleStringSchema(), properties))
>
>
> then, use the "stream" object to perform your transformations.
>
> The output format looks good!
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/internals/general_arch.html
>
>
> On Sun, Nov 22, 2015 at 8:52 PM, rss rss <rssdev10@gmail.com> wrote:
>
>> Hello,
>>
>>   I have prepared a prototype of the batch subsystem that described in
>> the previous mail.
>> https://github.com/rssdev10/flink-experiments/tree/init_by_kafka_sink_kafka
>> It does not contain correct Kafka's serialization/deserialization because I
>> didn't see how to do it yet. But it contains a code for running Flink batch
>> processing by a message from Kafka queue -
>> https://github.com/rssdev10/flink-experiments/blob/init_by_kafka_sink_kafka/src/main/java/proto/flink/batch/Consumer.java
>> . This code is based on Kafka’s examples.
>>
>>   The question is following. Is it correct implementation of the Flink
>> batch API activation? See method Consumer::run.
>>
>>   Also I added a sink for Kafka -
>> https://github.com/rssdev10/flink-experiments/blob/init_by_kafka_sink_kafka/src/main/java/proto/flink/batch/KafkaOutputFormat.java
>> Is it correct? Unfortunately the Flink's documentation does not contain
>> examples of a custom sink implementation
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#data-sinks
>> .
>>
>>   The program works at least in the mode of one queue looped by self
>> consumer-producer and activated by the only message from Kafka's console.
>>
>> Best regards,
>> Roman
>>
>> 2015-11-20 16:05 GMT+03:00 rss rss <rssdev10@gmail.com>:
>>
>>> Hello Aljoscha,
>>>
>>>   Thanks, I looked your code. I think, It is really useful for getting
>>> real time data from some sensors. And as a simple example it may be
>>> considered in a modern Internet of Thing context. E.g. there are some
>>> temperature sensor or sensor of water flow; and I want to build simple
>>> application when the data flow from the sensors is saved to persistent
>>> storage but a small real time buffer I want to use for visualizing on a
>>> display by a query.
>>>
>>>   But at the same time my question have a second part. I need to link
>>> the real time data with data from persistent storage. And I don't see how
>>> your example may help in this. Let the input query contains some data fetch
>>> condition. In this case we have to build a separate DataSet or DataStream
>>> to a persistent storage with specified conditions. It may be SQL or simple
>>> map(...).filter("something"). But main obstacle is how to configure new
>>> data processing schema been inside the current stream transformation. E.g.
>>> being inside the connected query stream map function.
>>>
>>>   Week ago I have prepared other schema of my task solving with
>>> separation of streaming and batch subsystems. See the attached image. It
>>> may be changed accordingly your example but I don't see other way to
>>> resolve the task than separate queries to persistent storage in batch part.
>>>
>>> [image: Встроенное изображение 1]
>>>
>>>   And note, this schema describes an idea about how to emulate a real
>>> time buffer by means of Kafka. Windowed stream infinitely produces data
>>> sequences and sinks ones into an external queue with limited storing time
>>> or without storing in whole. Any consumers connected to the queue are
>>> received an actual data. I don't like this idea because it is excess
>>> network communication but it looks workable.
>>>
>>>   BTW: it is something like lambda/kappa architecture implementation. I
>>> don't like these terms but actually it is.
>>>
>>> Best regards,
>>> Roman
>>>
>>>
>>> 2015-11-20 13:26 GMT+04:00 Aljoscha Krettek <aljoscha@apache.org>:
>>>
>>>> Hi,
>>>> I’m very sorry, yes you would need my custom branch:
>>>> https://github.com/aljoscha/flink/commits/state-enhance
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>> > On 20 Nov 2015, at 10:13, rss rss <rssdev10@gmail.com> wrote:
>>>> >
>>>> > Hello Aljoscha,
>>>> >
>>>> >   very thanks. I tried to build your example but have an obstacle
>>>> with org.apache.flink.runtime.state.AbstractStateBackend class. Where to
>>>> get it? I guess it stored in your local branch only. Would you please to
>>>> send me patches for public branch or share the branch with me?
>>>> >
>>>> > Best regards,
>>>> > Roman
>>>> >
>>>> >
>>>> > 2015-11-18 17:24 GMT+04:00 Aljoscha Krettek <aljoscha@apache.org>:
>>>> > Hi,
>>>> > I wrote a little example that could be what you are looking for:
>>>> https://github.com/dataArtisans/query-window-example
>>>> >
>>>> > It basically implements a window operator with a modifiable window
>>>> size that also allows querying the current accumulated window contents
>>>> using a second input stream.
>>>> >
>>>> > There is a README file in the github repository, but please let me
>>>> know if you need further explanations.
>>>> >
>>>> > Cheers,
>>>> > Aljoscha
>>>> >
>>>> > > On 18 Nov 2015, at 12:02, Robert Metzger <rmetzger@apache.org>
>>>> wrote:
>>>> > >
>>>> > > Hi Roman,
>>>> > >
>>>> > > I've updated the documentation. It seems that it got out of sync.
>>>> Thank you for notifying us about this.
>>>> > >
>>>> > > My colleague Aljoscha has some experimental code that is probably
>>>> doing what you are looking for: A standing window (your RT-buffer) that you
>>>> can query using a secondary stream (your user's queries).
>>>> > > He'll post the code soon to this email thread.
>>>> > >
>>>> > > Regards,
>>>> > > Robert
>>>> > >
>>>> > >
>>>> > > On Wed, Nov 11, 2015 at 2:51 PM, rss rss <rssdev10@gmail.com>
>>>> wrote:
>>>> > > Hello,
>>>> > >
>>>> > >   thanks, Stephan, but triggers are not that I searched. And BTW,
>>>> the documentation is obsolete. There is no Count class now. I found
>>>> CountTrigger only.
>>>> > >
>>>> > >   Thanks Robert, your example may be useful for me but in some
>>>> other point. I mentioned "union" as an ordinary union of similar data. It
>>>> is the same as "union" in the datastream documentation.
>>>> > >
>>>> > >   The task is very simple. We have an infinite stream of data from
>>>> sensors, billing system etc. There is no matter what it is but it is
>>>> infinite. We have to store the data in any persistent storage to be able
to
>>>> make analytical queries later. And there is a stream of user's analytical
>>>> queries. But the stream of input data is big and time of saving in the
>>>> persistent storage is big too. And we have not a very fast bigdata OLTP
>>>> storage. That is the data extracted from the persistent storage by the
>>>> user's requests probably will not contain actual data. We have to have some
>>>> real time buffer (RT-Buffer in the schema) with actual data and have to
>>>> union it with the data processing results from persistent storage (I don't
>>>> speak about data deduplication and ordering now.). And of course the user's
>>>> query are unpredictable regarding data filtering conditions.
>>>> > >
>>>> > >   The attached schema is attempt to understand how it may be
>>>> implemented with Flink. I tried to imagine how to implement it by Flink's
>>>> streaming API but found obstacles. This schema is not first variant. It
>>>> contains separated driver program to configure new jobs by user's queries.
>>>> The reason I not found a way how to link the stream of user's queries with
>>>> further data processing. But it is some near to
>>>> https://gist.github.com/fhueske/4ea5422edb5820915fa4
>>>> > >
>>>> > >
>>>> > > <flink_streams.png>
>>>> > >
>>>> > >   The main question is how to process each user's query combining
>>>> it with actual data from the real time buffer and batch request to the
>>>> persistent storage. Unfortunately I not found a decision in Streaming API
>>>> only.
>>>> > >
>>>> > > Regards,
>>>> > > Roman
>>>> > >
>>>> > > 2015-11-11 15:45 GMT+04:00 Robert Metzger <rmetzger@apache.org>:
>>>> > > I think what you call "union" is a "connected stream" in Flink.
>>>> Have a look at this example:
>>>> https://gist.github.com/fhueske/4ea5422edb5820915fa4
>>>> > > It shows how to dynamically update a list of filters by external
>>>> requests.
>>>> > > Maybe that's what you are looking for?
>>>> > >
>>>> > >
>>>> > >
>>>> > > On Wed, Nov 11, 2015 at 12:15 PM, Stephan Ewen <sewen@apache.org>
>>>> wrote:
>>>> > > Hi!
>>>> > >
>>>> > > I don not really understand what exactly you want to do, especially
>>>> the "union an infinite real time data stream with filtered persistent data
>>>> where the condition of filtering is provided by external requests".
>>>> > >
>>>> > > If you want to work on substreams in general, there are two options:
>>>> > >
>>>> > > 1) Create the substream in a streaming window. You can "cut" the
>>>> stream based on special records/events that signal that the subsequence is
>>>> done. Have a look at the "Trigger" class for windows, it can react to
>>>> elements and their contents:
>>>> > >
>>>> > >
>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#windows-on-keyed-data-streams
>>>> (secion on Advanced Windowing).
>>>> > >
>>>> > >
>>>> > > 2) You can trigger sequences of batch jobs. The batch job data
>>>> source (input format) can decide when to stop consuming the stream, at
>>>> which point the remainder of the transformations run, and the batch job
>>>> finishes.
>>>> > > You can already run new transformation chains after each call to
>>>> "env.execute()", once the execution finished, to implement the sequence of
>>>> batch jobs.
>>>> > >
>>>> > >
>>>> > > I would try and go for the windowing solution if that works,
>>>> because that will give you better fault tolerance / high availability. In
>>>> the repeated batch jobs case, you need to worry yourself about what happens
>>>> when the driver program (that calls env.execute()) fails.
>>>> > >
>>>> > >
>>>> > > Hope that helps...
>>>> > >
>>>> > > Greetings,
>>>> > > Stephan
>>>> > >
>>>> > >
>>>> > >
>>>> > > On Mon, Nov 9, 2015 at 1:24 PM, rss rss <rssdev10@gmail.com>
wrote:
>>>> > > Hello,
>>>> > >
>>>> > >   thanks for the answer but windows produce periodical results.
I
>>>> used your example but the data source is changed to TCP stream:
>>>> > >
>>>> > >         DataStream<String> text = env.socketTextStream("localhost",
>>>> 2015, '\n');
>>>> > >         DataStream<Tuple2<String, Integer>> wordCounts
=
>>>> > >                 text
>>>> > >                 .flatMap(new LineSplitter())
>>>> > >                 .keyBy(0)
>>>> > >                 .timeWindow(Time.of(5, TimeUnit.SECONDS))
>>>> > >                 .sum(1);
>>>> > >
>>>> > >         wordCounts.print();
>>>> > >         env.execute("WordCount Example");
>>>> > >
>>>> > >  I see an infinite results printing instead of the only list.
>>>> > >
>>>> > >  The data source is following script:
>>>> > > -----------------------------------------------------
>>>> > > #!/usr/bin/env ruby
>>>> > >
>>>> > > require 'socket'
>>>> > >
>>>> > > server = TCPServer.new 2015
>>>> > > loop do
>>>> > >   Thread.start(server.accept) do |client|
>>>> > >     puts Time.now.to_s + ': New client!'
>>>> > >     loop do
>>>> > >       client.puts "#{Time.now} #{[*('A'..'Z')].sample(3).join}"
>>>> > >       sleep rand(1000)/1000.0
>>>> > >     end
>>>> > >     client.close
>>>> > >   end
>>>> > > end
>>>> > > -----------------------------------------------------
>>>> > >
>>>> > >   My purpose is to union an infinite real time data stream with
>>>> filtered persistent data where the condition of filtering is provided by
>>>> external requests. And the only result of union is interested. In this case
>>>> I guess I need a way to terminate the stream. May be I wrong.
>>>> > >
>>>> > >   Moreover it should be possible to link the streams by next
>>>> request with other filtering criteria. That is create new data
>>>> transformation chain after running of env.execute("WordCount Example"). Is
>>>> it possible now? If not, is it possible with minimal changes of the core
of
>>>> Flink?
>>>> > >
>>>> > > Regards,
>>>> > > Roman
>>>> > >
>>>> > > 2015-11-09 12:34 GMT+04:00 Stephan Ewen <sewen@apache.org>:
>>>> > > Hi!
>>>> > >
>>>> > > If you want to work on subsets of streams, the answer is usually
to
>>>> use windows, "stream.keyBy(...).timeWindow(Time.of(1, MINUTE))".
>>>> > >
>>>> > > The transformations that you want to make, do they fit into a
>>>> window function?
>>>> > >
>>>> > > There are thoughts to introduce something like global time windows
>>>> across the entire stream, inside which you can work more in a batch-style,
>>>> but that is quite an extensive change to the core.
>>>> > >
>>>> > > Greetings,
>>>> > > Stephan
>>>> > >
>>>> > >
>>>> > > On Sun, Nov 8, 2015 at 5:15 PM, rss rss <rssdev10@gmail.com>
wrote:
>>>> > > Hello,
>>>> > >
>>>> > >
>>>> > > I need to extract a finite subset like a data buffer from an
>>>> infinite data stream. The best way for me is to obtain a finite stream with
>>>> data accumulated for a 1minute before (as example). But I not found any
>>>> existing technique to do it.
>>>> > >
>>>> > >
>>>> > > As a possible ways how to do something near to a stream’s subset
I
>>>> see following cases:
>>>> > >
>>>> > > -          some transformation operation like ‘take_while’
that
>>>> produces new stream but able to switch one to FINISHED state. Unfortunately
>>>> I not found how to switch the state of a stream from a user code of
>>>> transformation functions;
>>>> > >
>>>> > > -          new DataStream or StreamSource constructors which allow
>>>> to connect a data processing chain to the source stream. It may be
>>>> something like mentioned take_while transform function or modified
>>>> StreamSource.run method with data from the source stream.
>>>> > >
>>>> > >
>>>> > > That is I have two questions.
>>>> > >
>>>> > > 1)      Is there any technique to extract accumulated data from
a
>>>> stream as a stream (to union it with another stream)? This is like pure
>>>> buffer mode.
>>>> > >
>>>> > > 2)      If the answer to first question is negative, is there
>>>> something like take_while transformation or should I think about custom
>>>> implementation of it? Is it possible to implement it without modification
>>>> of the core of Flink?
>>>> > >
>>>> > >
>>>> > > Regards,
>>>> > >
>>>> > > Roman
>>>> > >
>>>> > >
>>>> > >
>>>> > >
>>>> > >
>>>> > >
>>>> > >
>>>> >
>>>> >
>>>>
>>>>
>>>
>>
>

Mime
View raw message