flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rss rss <rssde...@gmail.com>
Subject Re: finite subset of an infinite data stream
Date Sun, 29 Nov 2015 20:07:32 GMT
Hello Stephan,

  very thanks, RichMapFunction with overlapped "open" method is really that
I need.

Best regards,
Roman

2015-11-27 15:03 GMT+03:00 Stephan Ewen <sewen@apache.org>:

> Hi Roman!
>
> Is it possible to do the following:
>
> You have a queue (Kafka) with the user requests. Then you have a Flink job
> that reads from that queue and does a map() over the query stream. In the
> map() function, you do the call to the database, like this:
>
> val queryStream : DataStream[Query] = readQueriesFromKafka();
>
> val resultStream : DataStream[Result] = queryStream.map(new
> RichMapFunction[Query, Result]() {
>
>   var connection: DbConnection = _
>   val queryStatement: PreparedStatement = _
>
>   def open(cfg: Configuration) : Unit = {
>     connection = // connect to data base
>     query = // prepare query statement
>   }
>
>   def map(query: Query) : Result = {
>     connection.runQuery(queryStatement, query.param1, query.param2, ...);
>   }
> }
>
> Since the queries have quite a bit of latency, you could try and run them
> with a very parallelism, or use a threadpool executor or so...
>
> Greetings,
> Stephan
>
>
> On Thu, Nov 26, 2015 at 11:06 AM, rss rss <rssdev10@gmail.com> wrote:
>
>> Hello Robert,
>>
>>   thank you for the answer.
>>
>>   I understand the idea of stream usage but I don't understand how to use
>> it in my task. Aljoscha wrote an example of data parts extraction by
>> external queries. It is very useful but not enough.
>>
>>   I have conditional queries. Simple example 'get data from the specified
>> period' by a specified client (or sensor, or something other)'. Flink
>> streaming API allows to configure a stream to access to a persistent
>> storage. E.g. some DBMS with SQL interface. In this case I have to
>> configure the stream with SQL query in a constructor like 'select * from
>> data where timestamp > now() - 1day AND clientId == id'. But '1 day' and
>> 'id' are parameters from the input query. I don't able to configure all
>> possible steams to produce all possible data combinations. Therefore I
>> decided to use batch part for running a transformation process for each
>> input query with a list of conditions. If it possible I will glad to use
>> streaming API only. But there are no ideas other than batch API usage.
>>
>> Best regards,
>> Roman
>>
>>
>> 2015-11-25 21:51 GMT+04:00 Robert Metzger <rmetzger@apache.org>:
>>
>>> Hi Roman,
>>>
>>> I've looked through your code here:
>>> https://github.com/rssdev10/flink-experiments/blob/init_by_kafka_sink_kafka/src/main/java/proto/flink/batch/Consumer.java
>>>
>>> The implementation is not very efficient. Flink is not like spark where
>>> the job's driver is constantly interacting with the program running in the
>>> cluster.
>>> Flink will generate a plan from your code and submit it to the cluster
>>> [1]. Your code is submitting a new plan for every message in Kafka into the
>>> cluster.
>>> It will be faster to process the data locally.
>>>
>>> I still think that you can use the DataStream API of Flink. Maybe use
>>> this example as a starting point:
>>>
>>> Properties properties = new Properties();properties.setProperty("bootstrap.servers",
"localhost:9092");properties.setProperty("zookeeper.connect", "localhost:2181");properties.setProperty("group.id",
"test");DataStream<String> stream = env
>>> 	.addSource(new FlinkKafkaConsumer082<>("topic", new SimpleStringSchema(),
properties))
>>>
>>>
>>> then, use the "stream" object to perform your transformations.
>>>
>>> The output format looks good!
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/internals/general_arch.html
>>>
>>>
>>> On Sun, Nov 22, 2015 at 8:52 PM, rss rss <rssdev10@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>>   I have prepared a prototype of the batch subsystem that described in
>>>> the previous mail.
>>>> https://github.com/rssdev10/flink-experiments/tree/init_by_kafka_sink_kafka
>>>> It does not contain correct Kafka's serialization/deserialization because
I
>>>> didn't see how to do it yet. But it contains a code for running Flink batch
>>>> processing by a message from Kafka queue -
>>>> https://github.com/rssdev10/flink-experiments/blob/init_by_kafka_sink_kafka/src/main/java/proto/flink/batch/Consumer.java
>>>> . This code is based on Kafka’s examples.
>>>>
>>>>   The question is following. Is it correct implementation of the Flink
>>>> batch API activation? See method Consumer::run.
>>>>
>>>>   Also I added a sink for Kafka -
>>>> https://github.com/rssdev10/flink-experiments/blob/init_by_kafka_sink_kafka/src/main/java/proto/flink/batch/KafkaOutputFormat.java
>>>> Is it correct? Unfortunately the Flink's documentation does not contain
>>>> examples of a custom sink implementation
>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#data-sinks
>>>> .
>>>>
>>>>   The program works at least in the mode of one queue looped by self
>>>> consumer-producer and activated by the only message from Kafka's console.
>>>>
>>>> Best regards,
>>>> Roman
>>>>
>>>> 2015-11-20 16:05 GMT+03:00 rss rss <rssdev10@gmail.com>:
>>>>
>>>>> Hello Aljoscha,
>>>>>
>>>>>   Thanks, I looked your code. I think, It is really useful for getting
>>>>> real time data from some sensors. And as a simple example it may be
>>>>> considered in a modern Internet of Thing context. E.g. there are some
>>>>> temperature sensor or sensor of water flow; and I want to build simple
>>>>> application when the data flow from the sensors is saved to persistent
>>>>> storage but a small real time buffer I want to use for visualizing on
a
>>>>> display by a query.
>>>>>
>>>>>   But at the same time my question have a second part. I need to link
>>>>> the real time data with data from persistent storage. And I don't see
how
>>>>> your example may help in this. Let the input query contains some data
fetch
>>>>> condition. In this case we have to build a separate DataSet or DataStream
>>>>> to a persistent storage with specified conditions. It may be SQL or simple
>>>>> map(...).filter("something"). But main obstacle is how to configure new
>>>>> data processing schema been inside the current stream transformation.
E.g.
>>>>> being inside the connected query stream map function.
>>>>>
>>>>>   Week ago I have prepared other schema of my task solving with
>>>>> separation of streaming and batch subsystems. See the attached image.
It
>>>>> may be changed accordingly your example but I don't see other way to
>>>>> resolve the task than separate queries to persistent storage in batch
part.
>>>>>
>>>>> [image: Встроенное изображение 1]
>>>>>
>>>>>   And note, this schema describes an idea about how to emulate a real
>>>>> time buffer by means of Kafka. Windowed stream infinitely produces data
>>>>> sequences and sinks ones into an external queue with limited storing
time
>>>>> or without storing in whole. Any consumers connected to the queue are
>>>>> received an actual data. I don't like this idea because it is excess
>>>>> network communication but it looks workable.
>>>>>
>>>>>   BTW: it is something like lambda/kappa architecture implementation.
>>>>> I don't like these terms but actually it is.
>>>>>
>>>>> Best regards,
>>>>> Roman
>>>>>
>>>>>
>>>>> 2015-11-20 13:26 GMT+04:00 Aljoscha Krettek <aljoscha@apache.org>:
>>>>>
>>>>>> Hi,
>>>>>> I’m very sorry, yes you would need my custom branch:
>>>>>> https://github.com/aljoscha/flink/commits/state-enhance
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>> > On 20 Nov 2015, at 10:13, rss rss <rssdev10@gmail.com>
wrote:
>>>>>> >
>>>>>> > Hello Aljoscha,
>>>>>> >
>>>>>> >   very thanks. I tried to build your example but have an obstacle
>>>>>> with org.apache.flink.runtime.state.AbstractStateBackend class. Where
to
>>>>>> get it? I guess it stored in your local branch only. Would you please
to
>>>>>> send me patches for public branch or share the branch with me?
>>>>>> >
>>>>>> > Best regards,
>>>>>> > Roman
>>>>>> >
>>>>>> >
>>>>>> > 2015-11-18 17:24 GMT+04:00 Aljoscha Krettek <aljoscha@apache.org>:
>>>>>> > Hi,
>>>>>> > I wrote a little example that could be what you are looking
for:
>>>>>> https://github.com/dataArtisans/query-window-example
>>>>>> >
>>>>>> > It basically implements a window operator with a modifiable
window
>>>>>> size that also allows querying the current accumulated window contents
>>>>>> using a second input stream.
>>>>>> >
>>>>>> > There is a README file in the github repository, but please
let me
>>>>>> know if you need further explanations.
>>>>>> >
>>>>>> > Cheers,
>>>>>> > Aljoscha
>>>>>> >
>>>>>> > > On 18 Nov 2015, at 12:02, Robert Metzger <rmetzger@apache.org>
>>>>>> wrote:
>>>>>> > >
>>>>>> > > Hi Roman,
>>>>>> > >
>>>>>> > > I've updated the documentation. It seems that it got out
of sync.
>>>>>> Thank you for notifying us about this.
>>>>>> > >
>>>>>> > > My colleague Aljoscha has some experimental code that is
probably
>>>>>> doing what you are looking for: A standing window (your RT-buffer)
that you
>>>>>> can query using a secondary stream (your user's queries).
>>>>>> > > He'll post the code soon to this email thread.
>>>>>> > >
>>>>>> > > Regards,
>>>>>> > > Robert
>>>>>> > >
>>>>>> > >
>>>>>> > > On Wed, Nov 11, 2015 at 2:51 PM, rss rss <rssdev10@gmail.com>
>>>>>> wrote:
>>>>>> > > Hello,
>>>>>> > >
>>>>>> > >   thanks, Stephan, but triggers are not that I searched.
And BTW,
>>>>>> the documentation is obsolete. There is no Count class now. I found
>>>>>> CountTrigger only.
>>>>>> > >
>>>>>> > >   Thanks Robert, your example may be useful for me but
in some
>>>>>> other point. I mentioned "union" as an ordinary union of similar
data. It
>>>>>> is the same as "union" in the datastream documentation.
>>>>>> > >
>>>>>> > >   The task is very simple. We have an infinite stream of
data
>>>>>> from sensors, billing system etc. There is no matter what it is but
it is
>>>>>> infinite. We have to store the data in any persistent storage to
be able to
>>>>>> make analytical queries later. And there is a stream of user's analytical
>>>>>> queries. But the stream of input data is big and time of saving in
the
>>>>>> persistent storage is big too. And we have not a very fast bigdata
OLTP
>>>>>> storage. That is the data extracted from the persistent storage by
the
>>>>>> user's requests probably will not contain actual data. We have to
have some
>>>>>> real time buffer (RT-Buffer in the schema) with actual data and have
to
>>>>>> union it with the data processing results from persistent storage
(I don't
>>>>>> speak about data deduplication and ordering now.). And of course
the user's
>>>>>> query are unpredictable regarding data filtering conditions.
>>>>>> > >
>>>>>> > >   The attached schema is attempt to understand how it may
be
>>>>>> implemented with Flink. I tried to imagine how to implement it by
Flink's
>>>>>> streaming API but found obstacles. This schema is not first variant.
It
>>>>>> contains separated driver program to configure new jobs by user's
queries.
>>>>>> The reason I not found a way how to link the stream of user's queries
with
>>>>>> further data processing. But it is some near to
>>>>>> https://gist.github.com/fhueske/4ea5422edb5820915fa4
>>>>>> > >
>>>>>> > >
>>>>>> > > <flink_streams.png>
>>>>>> > >
>>>>>> > >   The main question is how to process each user's query
combining
>>>>>> it with actual data from the real time buffer and batch request to
the
>>>>>> persistent storage. Unfortunately I not found a decision in Streaming
API
>>>>>> only.
>>>>>> > >
>>>>>> > > Regards,
>>>>>> > > Roman
>>>>>> > >
>>>>>> > > 2015-11-11 15:45 GMT+04:00 Robert Metzger <rmetzger@apache.org>:
>>>>>> > > I think what you call "union" is a "connected stream" in
Flink.
>>>>>> Have a look at this example:
>>>>>> https://gist.github.com/fhueske/4ea5422edb5820915fa4
>>>>>> > > It shows how to dynamically update a list of filters by
external
>>>>>> requests.
>>>>>> > > Maybe that's what you are looking for?
>>>>>> > >
>>>>>> > >
>>>>>> > >
>>>>>> > > On Wed, Nov 11, 2015 at 12:15 PM, Stephan Ewen <sewen@apache.org>
>>>>>> wrote:
>>>>>> > > Hi!
>>>>>> > >
>>>>>> > > I don not really understand what exactly you want to do,
>>>>>> especially the "union an infinite real time data stream with filtered
>>>>>> persistent data where the condition of filtering is provided by external
>>>>>> requests".
>>>>>> > >
>>>>>> > > If you want to work on substreams in general, there are
two
>>>>>> options:
>>>>>> > >
>>>>>> > > 1) Create the substream in a streaming window. You can
"cut" the
>>>>>> stream based on special records/events that signal that the subsequence
is
>>>>>> done. Have a look at the "Trigger" class for windows, it can react
to
>>>>>> elements and their contents:
>>>>>> > >
>>>>>> > >
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#windows-on-keyed-data-streams
>>>>>> (secion on Advanced Windowing).
>>>>>> > >
>>>>>> > >
>>>>>> > > 2) You can trigger sequences of batch jobs. The batch job
data
>>>>>> source (input format) can decide when to stop consuming the stream,
at
>>>>>> which point the remainder of the transformations run, and the batch
job
>>>>>> finishes.
>>>>>> > > You can already run new transformation chains after each
call to
>>>>>> "env.execute()", once the execution finished, to implement the sequence
of
>>>>>> batch jobs.
>>>>>> > >
>>>>>> > >
>>>>>> > > I would try and go for the windowing solution if that works,
>>>>>> because that will give you better fault tolerance / high availability.
In
>>>>>> the repeated batch jobs case, you need to worry yourself about what
happens
>>>>>> when the driver program (that calls env.execute()) fails.
>>>>>> > >
>>>>>> > >
>>>>>> > > Hope that helps...
>>>>>> > >
>>>>>> > > Greetings,
>>>>>> > > Stephan
>>>>>> > >
>>>>>> > >
>>>>>> > >
>>>>>> > > On Mon, Nov 9, 2015 at 1:24 PM, rss rss <rssdev10@gmail.com>
>>>>>> wrote:
>>>>>> > > Hello,
>>>>>> > >
>>>>>> > >   thanks for the answer but windows produce periodical
results. I
>>>>>> used your example but the data source is changed to TCP stream:
>>>>>> > >
>>>>>> > >         DataStream<String> text =
>>>>>> env.socketTextStream("localhost", 2015, '\n');
>>>>>> > >         DataStream<Tuple2<String, Integer>>
wordCounts =
>>>>>> > >                 text
>>>>>> > >                 .flatMap(new LineSplitter())
>>>>>> > >                 .keyBy(0)
>>>>>> > >                 .timeWindow(Time.of(5, TimeUnit.SECONDS))
>>>>>> > >                 .sum(1);
>>>>>> > >
>>>>>> > >         wordCounts.print();
>>>>>> > >         env.execute("WordCount Example");
>>>>>> > >
>>>>>> > >  I see an infinite results printing instead of the only
list.
>>>>>> > >
>>>>>> > >  The data source is following script:
>>>>>> > > -----------------------------------------------------
>>>>>> > > #!/usr/bin/env ruby
>>>>>> > >
>>>>>> > > require 'socket'
>>>>>> > >
>>>>>> > > server = TCPServer.new 2015
>>>>>> > > loop do
>>>>>> > >   Thread.start(server.accept) do |client|
>>>>>> > >     puts Time.now.to_s + ': New client!'
>>>>>> > >     loop do
>>>>>> > >       client.puts "#{Time.now} #{[*('A'..'Z')].sample(3).join}"
>>>>>> > >       sleep rand(1000)/1000.0
>>>>>> > >     end
>>>>>> > >     client.close
>>>>>> > >   end
>>>>>> > > end
>>>>>> > > -----------------------------------------------------
>>>>>> > >
>>>>>> > >   My purpose is to union an infinite real time data stream
with
>>>>>> filtered persistent data where the condition of filtering is provided
by
>>>>>> external requests. And the only result of union is interested. In
this case
>>>>>> I guess I need a way to terminate the stream. May be I wrong.
>>>>>> > >
>>>>>> > >   Moreover it should be possible to link the streams by
next
>>>>>> request with other filtering criteria. That is create new data
>>>>>> transformation chain after running of env.execute("WordCount Example").
Is
>>>>>> it possible now? If not, is it possible with minimal changes of the
core of
>>>>>> Flink?
>>>>>> > >
>>>>>> > > Regards,
>>>>>> > > Roman
>>>>>> > >
>>>>>> > > 2015-11-09 12:34 GMT+04:00 Stephan Ewen <sewen@apache.org>:
>>>>>> > > Hi!
>>>>>> > >
>>>>>> > > If you want to work on subsets of streams, the answer is
usually
>>>>>> to use windows, "stream.keyBy(...).timeWindow(Time.of(1, MINUTE))".
>>>>>> > >
>>>>>> > > The transformations that you want to make, do they fit
into a
>>>>>> window function?
>>>>>> > >
>>>>>> > > There are thoughts to introduce something like global time
>>>>>> windows across the entire stream, inside which you can work more
in a
>>>>>> batch-style, but that is quite an extensive change to the core.
>>>>>> > >
>>>>>> > > Greetings,
>>>>>> > > Stephan
>>>>>> > >
>>>>>> > >
>>>>>> > > On Sun, Nov 8, 2015 at 5:15 PM, rss rss <rssdev10@gmail.com>
>>>>>> wrote:
>>>>>> > > Hello,
>>>>>> > >
>>>>>> > >
>>>>>> > > I need to extract a finite subset like a data buffer from
an
>>>>>> infinite data stream. The best way for me is to obtain a finite stream
with
>>>>>> data accumulated for a 1minute before (as example). But I not found
any
>>>>>> existing technique to do it.
>>>>>> > >
>>>>>> > >
>>>>>> > > As a possible ways how to do something near to a stream’s
subset
>>>>>> I see following cases:
>>>>>> > >
>>>>>> > > -          some transformation operation like ‘take_while’
that
>>>>>> produces new stream but able to switch one to FINISHED state. Unfortunately
>>>>>> I not found how to switch the state of a stream from a user code
of
>>>>>> transformation functions;
>>>>>> > >
>>>>>> > > -          new DataStream or StreamSource constructors
which
>>>>>> allow to connect a data processing chain to the source stream. It
may be
>>>>>> something like mentioned take_while transform function or modified
>>>>>> StreamSource.run method with data from the source stream.
>>>>>> > >
>>>>>> > >
>>>>>> > > That is I have two questions.
>>>>>> > >
>>>>>> > > 1)      Is there any technique to extract accumulated data
from a
>>>>>> stream as a stream (to union it with another stream)? This is like
pure
>>>>>> buffer mode.
>>>>>> > >
>>>>>> > > 2)      If the answer to first question is negative, is
there
>>>>>> something like take_while transformation or should I think about
custom
>>>>>> implementation of it? Is it possible to implement it without modification
>>>>>> of the core of Flink?
>>>>>> > >
>>>>>> > >
>>>>>> > > Regards,
>>>>>> > >
>>>>>> > > Roman
>>>>>> > >
>>>>>> > >
>>>>>> > >
>>>>>> > >
>>>>>> > >
>>>>>> > >
>>>>>> > >
>>>>>> >
>>>>>> >
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message