Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 468B319D86 for ; Thu, 28 Apr 2016 12:48:17 +0000 (UTC) Received: (qmail 48626 invoked by uid 500); 28 Apr 2016 12:48:12 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 48531 invoked by uid 500); 28 Apr 2016 12:48:12 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 48516 invoked by uid 99); 28 Apr 2016 12:48:11 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Apr 2016 12:48:11 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 58676C6E82 for ; Thu, 28 Apr 2016 12:48:11 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.18 X-Spam-Level: * X-Spam-Status: No, score=1.18 tagged_above=-999 required=6.31 tests=[AC_DIV_BONANZA=0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 9A8a-ZfVq8mY for ; Thu, 28 Apr 2016 12:48:09 +0000 (UTC) Received: from mail-wm0-f42.google.com (mail-wm0-f42.google.com [74.125.82.42]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id 6D5AD5FBC0 for ; Thu, 28 Apr 2016 12:48:08 +0000 (UTC) Received: by mail-wm0-f42.google.com with SMTP id n129so64049543wmn.1 for ; Thu, 28 Apr 2016 05:48:08 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=YmYC0IoUelue1iPDnfN/Q0p4t0eJ4Zil6MwjiFMddm4=; b=lucYBBmW7dpDuhI0+uGldRh8l8cZIiXJIWBIkk3ZyByxgqGCrCcY2vPniAP9Dckszy fo/jGhyPPdNRGk6Warvg2sqWec5bsXdMREtQiFrXhTxz5imNhJgKUrHKNctOjjhr/nmW nQ/kkR3a6VUFrPMztOgrVtbf4saRwhKTbUCyjq2vzRDyrqtTETE6uFvZpALT2FgkaDM+ 0TMjeFKOGmrUSVyQNhAEVwpqG3iExbkHYJttvXZ6F+cjlN0MY6rmKBF370c2fYVJbmkP sE++/N1/NoSajrSYRd6yB3x048xMmo2Q4uMhBR53Owh2rtiWxrJ5SVuLoIt2uM7Sryb9 nUnw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=YmYC0IoUelue1iPDnfN/Q0p4t0eJ4Zil6MwjiFMddm4=; b=lvl8XZ7FTwc8HbBLBHDSBshoKsXvaD2dw+09l7hOX3KUphhnrXyNKDWNIFqo+vaC1W QsY9wEpCtsjnPyA/De6gVyFAUVOvf7ACgMjUHd6OGhMAAIc49LphX8i28LyhiISIYWRB xmWAD+TjH+ciGCPweoYyOVCZJqA+CVcIyxraCwuq+V+5R6uYrtKMMjjFH1GNoRTSNn+D yVr/qGduN4xgN9wdIzrgdPbCYIr9xrvZjNRX0HKl2Wv8U6X7wAEiWp1KC0VoPcObSVZ8 qvFHqd7Du1HNFQ9jmi9OV9B8hE/2ShA8wSBGE7eIDZUNtxVV1PawZX4cx+ifFvivN73C Lyww== X-Gm-Message-State: AOPr4FVJnCPnahVz4TobJOy6RNqBmkJUZMzYZz1hXR71zxcphgTcwnJk4AE3vCNNF34VSH4enpY+DKCkiP9C7g== X-Received: by 10.194.138.37 with SMTP id qn5mr15377898wjb.108.1461847687223; Thu, 28 Apr 2016 05:48:07 -0700 (PDT) MIME-Version: 1.0 Received: by 10.28.29.84 with HTTP; Thu, 28 Apr 2016 05:47:37 -0700 (PDT) In-Reply-To: References: From: Fabian Hueske Date: Thu, 28 Apr 2016 14:47:37 +0200 Message-ID: Subject: Re: General Data questions - streams vs batch To: user@flink.apache.org Content-Type: multipart/alternative; boundary=bcaec50fe6dd1cdf2905318af0db --bcaec50fe6dd1cdf2905318af0db Content-Type: text/plain; charset=UTF-8 True, flatMap does not have access to watermarks. You can also go a bit more to the low levels and directly implement an AbstractStreamOperator with OneInputStreamOperatorInterface. This is kind of the base class for the built-in stream operators and it has access to Watermarks (OneInputStreamOperator.processWatermark()). Maybe the easiest is to simply extend StreamFlatMap and override the processWatermark() method. Cheers, Fabian 2016-04-28 14:40 GMT+02:00 Konstantin Kulagin : > Thanks Fabian, > > works like a charm except the case when the stream is finite (or i have a > dataset from the beginning). > > In this case I need somehow identify that stream is finished and emit > latest batch (which might have less amount of elements) to output. > What is the best way to do that? In streams and windows we have support > for watermarks, but I do not see similar stuff for a flatMap operation? > > In the sample below I need to emit values from 30 to 32 as well: > > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > DataSet> source = env.fromCollection(LongStream.range(0, 33).mapToObj(l -> > Tuple2.of(l, "This is " + l)).collect(Collectors.toList())); > > source.flatMap(new RichFlatMapFunction, Tuple2>() { > List> cache = new ArrayList<>(); > > @Override > public RuntimeContext getRuntimeContext() { > return super.getRuntimeContext(); > } > > @Override > public void flatMap(Tuple2 value, Collector> out) throws Exception { > cache.add(value); > if (cache.size() == 5) { > System.out.println("!!!!! " + Thread.currentThread().getId() + ": " + Joiner.on(",").join(cache)); > cache.stream().forEach(out::collect); > cache.clear(); > } > } > }).setParallelism(2).print(); > > env.execute("yoyoyo"); > } > > > Output (flink realted stuff excluded): > > !!!!! 35: (1,This is 1),(3,This is 3),(5,This is 5),(7,This is 7),(9,This > is 9) > !!!!! 36: (0,This is 0),(2,This is 2),(4,This is 4),(6,This is 6),(8,This > is 8) > !!!!! 35: (11,This is 11),(13,This is 13),(15,This is 15),(17,This is > 17),(19,This is 19) > !!!!! 36: (10,This is 10),(12,This is 12),(14,This is 14),(16,This is > 16),(18,This is 18) > !!!!! 35: (21,This is 21),(23,This is 23),(25,This is 25),(27,This is > 27),(29,This is 29) > !!!!! 36: (20,This is 20),(22,This is 22),(24,This is 24),(26,This is > 26),(28,This is 28) > > > And if you can give a bit more info on why will I have latency issues in a > case of varying rate of arrival elements that would be perfect. Or point me > to a direction where I can read it. > > Thanks! > Konstantin. > > On Thu, Apr 28, 2016 at 7:26 AM, Fabian Hueske wrote: > >> Hi Konstantin, >> >> if you do not need a deterministic grouping of elements you should not >> use a keyed stream or window. >> Instead you can do the lookups in a parallel flatMap function. The >> function would collect arriving elements and perform a lookup query after a >> certain number of elements arrived (can cause high latency if the arrival >> rate of elements is low or varies). >> The flatmap function can be executed in parallel and does not require a >> keyed stream. >> >> Best, Fabian >> >> >> 2016-04-25 18:58 GMT+02:00 Konstantin Kulagin : >> >>> As usual - thanks for answers, Aljoscha! >>> >>> I think I understood what I want to know. >>> >>> 1) To add few comments: about streams I was thinking about something >>> similar to Storm where you can have one Source and 'duplicate' the same >>> entry going through different 'path's. >>> Something like this: >>> https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.4/bk_storm-user-guide/content/figures/1/figures/SpoutsAndBolts.png >>> And later you can 'join' these separate streams back. >>> And actually I think this is what I meant: >>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/JoinedStreams.html >>> - this one actually 'joins' by window. >>> >>> As for 'exact-once-guarantee' I've got the difference from this paper: >>> http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink >>> - Thanks! >>> >>> 2) understood, thank you very much >>> >>> >>> >>> >>> >>> >>> I'll probably bother you one more time with another question: >>> >>> 3) Lets say I have a Source which provides raw (i.e. non-keyed) data. >>> And lets say I need to 'enhance' each entry with some fields which I can >>> take from a database. >>> So I define some DbEnhanceOperation >>> >>> Database query might be expensive - so I would want to >>> a) batch entries to perform queries >>> b) be able to have several parallel DbEnhaceOperations so those will not >>> slow down my whole processing. >>> >>> >>> I do not see a way to do that? >>> >>> >>> Problems: >>> >>> I cannot go with countWindowAll because of b) - that thing does not >>> support several streams (correct?) >>> >>> So I need to create a windowed stream and for that I need to have some >>> key - Correct? I.e cannot create windows on a stream of general object just >>> using number of objects. >>> >>> I probably can 'emulate' keyed stream by providing some 'fake' key. But >>> in this case I can parallelize only on different keys. Again - it is >>> probably doable by introducing some AtomicLong key generator at the first >>> place ( this part probably hard to understand - I can share details if >>> necessary) but still looks like a bit of hack :) >>> >>> But the general question - if I can implement 3) 'normally' in a >>> flink-way? >>> >>> Thanks! >>> Konstantin. >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> On Mon, Apr 25, 2016 at 10:53 AM, Aljoscha Krettek >>> wrote: >>> >>>> Hi, >>>> I'll try and answer your questions separately. First, a general remark, >>>> although Flink has the DataSet API for batch processing and the DataStream >>>> API for stream processing we only have one underlying streaming execution >>>> engine that is used for both. Now, regarding the questions: >>>> >>>> 1) What do you mean by "parallel into 2 streams"? Maybe that could >>>> influence my answer but I'll just give a general answer: Flink does not >>>> give any guarantees about the ordering of elements in a Stream or in a >>>> DataSet. This means that merging or unioning two streams/data sets will >>>> just mean that operations see all elements in the two merged streams but >>>> the order in which we see them is arbitrary. This means that we don't keep >>>> buffers based on time or size or anything. >>>> >>>> 2) The elements that flow through the topology are not tracked >>>> individually, each operation just receives elements, updates state and >>>> sends elements to downstream operation. In essence this means that elements >>>> themselves don't block any resources except if they alter some kept state >>>> in operations. If you have a stateless pipeline that only has >>>> filters/maps/flatMaps then the amount of required resources is very low. >>>> >>>> For a finite data set, elements are also streamed through the topology. >>>> Only if you use operations that require grouping or sorting (such as >>>> groupBy/reduce and join) will elements be buffered in memory or on disk >>>> before they are processed. >>>> >>>> Two answer your last question. If you only do stateless >>>> transformations/filters then you are fine to use either API and the >>>> performance should be similar. >>>> >>>> Cheers, >>>> Aljoscha >>>> >>>> On Sun, 24 Apr 2016 at 15:54 Konstantin Kulagin >>>> wrote: >>>> >>>>> Hi guys, >>>>> >>>>> I have some kind of general question in order to get more >>>>> understanding of stream vs final data transformation. More specific - I am >>>>> trying to understand 'entities' lifecycle during processing. >>>>> >>>>> 1) For example in a case of streams: suppose we start with some >>>>> key-value source, parallel it into 2 streams by key. Each stream modifies >>>>> entry's values, lets say adds some fields. And we want to merge it back >>>>> later. How does it happen? >>>>> Merging point will keep some finite buffer of entries? Basing on time >>>>> or size? >>>>> >>>>> I understand that probably right solution in this case would be having >>>>> one stream and achieve more more performance by increasing parallelism, but >>>>> what if I have 2 sources from the beginning? >>>>> >>>>> >>>>> 2) Also I assume that in a case of streaming each entry considered as >>>>> 'processed' once it passes whole chain and emitted into some sink, so after >>>>> it will not consume resources. Basically similar to what Storm is doing. >>>>> But in a case of finite data (data sets): how big amount of data >>>>> system will keep in memory? The whole set? >>>>> >>>>> I probably have some example of dataset vs stream 'mix': I need to >>>>> *transform* big but finite chunk of data, I don't really need to do any >>>>> 'joins', grouping or smth like that so I never need to store whole dataset >>>>> in memory/storage. What my choice would be in this case? >>>>> >>>>> Thanks! >>>>> Konstantin >>>>> >>>>> >>>>> >>> >> > --bcaec50fe6dd1cdf2905318af0db Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
True, flatMap does not have access to = watermarks.

You can also go a bit more to the low levels and d= irectly implement an AbstractStreamOperator with OneInputStreamOperatorInte= rface.
This is kind of the base class for the built-in stream oper= ators and it has access to Watermarks (OneInputStreamOperator.processWaterm= ark()).

Maybe the easiest is to simply extend StreamFlatMap an= d override the processWatermark() method.

Cheers, Fabian

2016-04-28 14= :40 GMT+02:00 Konstantin Kulagin <kkulagin@gmail.com>:
<= blockquote class=3D"gmail_quote" style=3D"margin:0 0 0 .8ex;border-left:1px= #ccc solid;padding-left:1ex">
Thanks Fabian,
=
works like a charm except the case when the stream is finite (or = i have a dataset from the beginning).

In this case I need somehow i= dentify that stream is finished and emit latest batch (which might have les= s amount of elements) to output.
What is the best way to do = that? In streams and windows we have support for watermarks, but I do not s= ee similar stuff for a flatMap operation?

In the sample below I need= to emit values from 30 to 32 as well:
  ExecutionEnvironment env =3D ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2= <Long, String>> source =3D env.fromCollection(LongStream.range(0<= /span>, 33).mapToObj(l ->
= Tuple2.of(l, "This is " + l)).collect= (Collectors.toList()));

s= ource.flatMap(new RichFlatMapFunction<Tuple2<Long, String>, Tuple2<Long, String= >>() {
List<Tuple2<Long, String>> cache =3D new ArrayList<>();

= @Override
public RuntimeContext getRuntimeContext() {
return super.getRuntime= Context();
}

@Overri= de
public void flatMap(Tuple2<= Long, String> value, Collector<Tuple2<Long, String>> out) throws
Exception {=
cache.add(value);
if (cache.size() =3D=3D 5) {
= System.out.println("!!!!! " + Thread.cur= rentThread().getId() + ": " + Joiner.on= (",").join(cache));
c= ache.stream().forEach(out::collect);
cache.clear();
}
= }
}).setParallelism(2).print= ();

env.execute("yoyoyo");
}

Output (flink re= alted stuff excluded):

!!!!! 35:=C2=A0 (1,This is 1),(3,This is 3),(= 5,This is 5),(7,This is 7),(9,This is 9)
!!!!! 36:=C2=A0 (0,This is 0),(= 2,This is 2),(4,This is 4),(6,This is 6),(8,This is 8)
!!!!! 35:=C2=A0 (= 11,This is 11),(13,This is 13),(15,This is 15),(17,This is 17),(19,This is = 19)
!!!!! 36:=C2=A0 (10,This is 10),(12,This is 12),(14,This is 14),(16,= This is 16),(18,This is 18)
!!!!! 35:=C2=A0 (21,This is 21),(23,This is = 23),(25,This is 25),(27,This is 27),(29,This is 29)
!!!!! 36:=C2=A0 (20,= This is 20),(22,This is 22),(24,This is 24),(26,This is 26),(28,This is 28)=


And if you can give a bit more info on wh= y will I have latency issues in a case of varying rate of arrival elements = that would be perfect. Or point me to a direction where I can read it.
<= br>
Thanks!
Konsta= ntin.
=

On Thu, Apr 28, 2= 016 at 7:26 AM, Fabian Hueske <fhueske@gmail.com> wrote:
=
Hi Konstantin,
if you do not need a deterministic grouping of elements you shou= ld not use a keyed stream or window.
Instead you can do the lookups in a= parallel flatMap function. The function would collect arriving elements an= d perform a lookup query after a certain number of elements arrived (can ca= use high latency if the arrival rate of elements is low or varies).
The flatmap function can be executed in parallel and does not requir= e a keyed stream.

Best, Fabian

<= /div>

20= 16-04-25 18:58 GMT+02:00 Konstantin Kulagin <kkulagin@gmail.com>= :
=
As usual - thanks for answers, Aljoscha!=

I think I understood what I want to know.

1) To= add few comments: about streams I was thinking about something similar to = Storm where you can have one Source and 'duplicate' the same entry = going through different 'path's.
Something like this: https= ://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.4/bk_storm-user-guide/con= tent/figures/1/figures/SpoutsAndBolts.png
And later you can &#= 39;join' these separate streams back.

As for 'exact-once-guarantee' I've got the difference from = this paper: ht= tp://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-= processing-with-apache-flink - Thanks!

2) understood,= thank you very much






I'll probably bot= her you one more time with another question:

3) Lets say I hav= e a Source which provides raw (i.e. non-keyed) data. And lets say I need to= 'enhance' each entry with some fields which I can take from a data= base.
So I define some DbEnhanceOperation

Database query might b= e expensive - so I would want to
a) batch entries to perform queri= es
b) be able to have several parallel DbEnhaceOperations so those= will not slow down my whole processing.


I do not see a wa= y to do that?


Problems:

I cannot go with countWindowAll = because of b) - that thing does not support several streams (correct?)
<= br>So I need to create a windowed stream and for that I need to have some k= ey - Correct? I.e cannot create windows on a stream of general object just = using number of objects.

I probably can 'emulate'= ; keyed stream by providing some 'fake' key. But in this case I can= parallelize only on different keys. Again - it is probably doable by intro= ducing some AtomicLong key generator at the first place ( this part probabl= y hard to understand - I can share details if necessary) but still looks li= ke a bit of hack :)

But the general question - if I can = implement 3) 'normally' in a flink-way?

Thanks!
Konstantin.










<= /div>

On Mon, Apr 25, 2016 at 10:53 AM, Aljoscha Krettek <al= joscha@apache.org> wrote:
<= div dir=3D"ltr">Hi,
I'll try and answer your questions separately. = First, a general remark, although Flink has the DataSet API for batch proce= ssing and the DataStream API for stream processing we only have one underly= ing streaming execution engine that is used for both. Now, regarding the qu= estions:

1) What do you mean by "parallel int= o 2 streams"? Maybe that could influence my answer but I'll just g= ive a general answer: Flink does not give any guarantees about the ordering= of elements in a Stream or in a DataSet. This means that merging or unioni= ng two streams/data sets will just mean that operations see all elements in= the two merged streams but the order in which we see them is arbitrary. Th= is means that we don't keep buffers based on time or size or anything.<= /div>

2) The elements that flow through the topology are= not tracked individually, each operation just receives elements, updates s= tate and sends elements to downstream operation. In essence this means that= elements themselves don't block any resources except if they alter som= e kept state in operations. If you have a stateless pipeline that only has = filters/maps/flatMaps then the amount of required resources is very low.

For a finite data set, elements are also streamed th= rough the topology. Only if you use operations that require grouping or sor= ting (such as groupBy/reduce and join) will elements be buffered in memory = or on disk before they are processed.

Two answer y= our last question. If you only do stateless transformations/filters then yo= u are fine to use either API and the performance should be similar.

Cheers,
Aljoscha

On Sun, 24 Apr 2016 at 15:54 Konstan= tin Kulagin <kku= lagin@gmail.com> wrote:
Hi guys,

I have some kind of general qu= estion in order to get more understanding of stream vs final data transform= ation. More specific - I am trying to understand 'entities' lifecyc= le during processing.

1) For example in a case of streams= : suppose we start with some key-value source, parallel it into 2 streams b= y key. Each stream modifies entry's values, lets say adds some fields. = And we want to merge it back later. How does it happen?
Merging point w= ill keep some finite buffer of entries? Basing on time or size?

I understand that probably right solution in this case would be havi= ng one stream and achieve more more performance by increasing parallelism, = but what if I have 2 sources from the beginning?


2) Also I assume that in a case of streaming each entry considered a= s 'processed' once it passes whole chain and emitted into some sink= , so after it will not consume resources. Basically similar to what Storm i= s doing.
But in a case of finite data (data sets): how big am= ount of data system will keep in memory? The whole set?

= I probably have some example of dataset vs stream 'mix': I need to = *transform* big but finite chunk of data, I don't really need to do any= 'joins', grouping or smth like that so I never need to store whole= dataset in memory/storage. What my choice would be in this case?

Thanks!
Konstantin
<= br>




--bcaec50fe6dd1cdf2905318af0db--