Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 82EA818504 for ; Wed, 18 Nov 2015 14:16:18 +0000 (UTC) Received: (qmail 57701 invoked by uid 500); 18 Nov 2015 14:16:18 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 57613 invoked by uid 500); 18 Nov 2015 14:16:18 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 57603 invoked by uid 99); 18 Nov 2015 14:16:18 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Nov 2015 14:16:18 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id B85AE1A0A7B for ; Wed, 18 Nov 2015 14:16:17 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.007 X-Spam-Level: *** X-Spam-Status: No, score=3.007 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.008, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id S4B2EOYX7k-r for ; Wed, 18 Nov 2015 14:16:08 +0000 (UTC) Received: from mail-qg0-f45.google.com (mail-qg0-f45.google.com [209.85.192.45]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 945AF43DC8 for ; Wed, 18 Nov 2015 14:16:08 +0000 (UTC) Received: by qgec40 with SMTP id c40so28345565qge.2 for ; Wed, 18 Nov 2015 06:16:01 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to:content-type; bh=tYT3/gNhMINhI+94Nzlegag1WNXLhb9hN73SOXHW3/g=; b=N8+1kKdHrr61b5HrAGBzBifmV020CJM1PuwUiddzdmy+tRJiw+YLz0EUQ4+tR6wPTC XYu6EKFX7pCSYnEbrp+tubUj3pe3sseKjkpAIb4Zy8OjbY62kN3m86/BXvffQ/6yco5r V5tHx5mggwHksMQgor0PoXrjKPDeE4puiBvS0xkN9VwUwLpOlQWhcWTw3JSBYUbRr8Xs 5ONKZV7knLtfVahQYwSPogy68N1shV/oAfgIw6QnzKPoKc7OS1M0foHK/SsJduPPHtZy cXfX2Bg3HgvTFx8rPZUp4lKd8OMQKrMWd9uQ6zYN/up3OPok9ct2r5g10B2tLVT1OnpK YfqA== MIME-Version: 1.0 X-Received: by 10.140.132.139 with SMTP id 133mr1738943qhe.49.1447856161683; Wed, 18 Nov 2015 06:16:01 -0800 (PST) Sender: ewenstephan@gmail.com Received: by 10.55.147.1 with HTTP; Wed, 18 Nov 2015 06:16:01 -0800 (PST) In-Reply-To: References: Date: Wed, 18 Nov 2015 15:16:01 +0100 X-Google-Sender-Auth: dhM0LOr8woIwoqcKsG6krMvqWMc Message-ID: Subject: Re: Published test artifacts for flink streaming From: Stephan Ewen To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a113508ee33fe6d0524d14811 --001a113508ee33fe6d0524d14811 Content-Type: text/plain; charset=UTF-8 There is no global order in parallel streams, it is something that applications need to work with. We are thinking about adding operations to introduce event-time order (at the cost of some delay), but that is only plans at this point. What I do in my tests is run the test streams in parallel, but the Sink in DOP 1. The sink gathers the elements in a list, and the close() function validates the result. Validating the results may involve sorting the list where elements where gathered (make the order deterministic) or use a hash set if it is only about distinct count. Hope that helps. On Thu, Nov 12, 2015 at 8:24 PM, Nick Dimiduk wrote: > On Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk wrote: > >> Thanks Stephan, I'll check that out in the morning. Generally speaking, >> it would be great to have some single-jvm example tests for those of us >> getting started. Following the example of WindowingIntegrationTest is >> mostly working, though reusing my single sink instance with its static >> collection results in non-deterministic results; there appears to be a race >> between instances clearing the collection in their open method and the >> runtime returning the collection to my test harness. > > > This inconsistent test result is pretty frustrating. I've created a sample > project with an IT that demonstrates the issue. Run `mvn test` multiple > times and see that sometimes it passes and sometimes it fails. Maybe > someone has some thoughts? > > https://gist.github.com/ndimiduk/5f3b4757eb772feed6e6 > > Thanks, > Nick > > I'd also appreciate some guidance on stream composition. It's nice to use >> the fluent API when exploring data in a shell, but it seems to me like that >> API is cumbersome when composing data pipelines of reusable partials. Or >> maybe I'm doing it all wrong... Hence the request for more examples :) >> >> While I'm asking, how might you model this: I have a set of predicates >> I'd like to flatMap over a stream. An input item should be compared vs >> every predicate (basically, I want a Clojure juxt of predicates over each >> stream element). Imagine those predicates expressed as where clauses via >> the Table API. Say I have hundreds of thousands of these predicates to run >> over every stream event. Is the java client API rich enough to express such >> a flow, or should I examine something lower than DataStream? >> >> Thanks a lot, and sorry for all the newb questions. >> -n >> >> >> On Thursday, November 5, 2015, Stephan Ewen wrote: >> >>> Hey! >>> >>> There is also a collect() sink in the "flink-streaming-contrib" project, >>> see here: >>> https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java >>> >>> It should work well locally for testing. In that case you can write a >>> program as usual an use "DataStreamUtils.collect(stream)", so you need to >>> stop reading it once you know the stream is exhausted... >>> >>> Stephan >>> >>> >>> On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk >>> wrote: >>> >>>> Hi Robert, >>>> >>>> It seems "type" was what I needed. This it also looks like the test >>>> jar has an undeclared dependency. In the end, the following allowed me >>>> to use TestStreamEnvironment for my integration test. Thanks a lot! >>>> >>>> -n >>>> >>>> >>>> org.apache.flink >>>> flink-streaming-core >>>> ${flink.version} >>>> test-jar >>>> test >>>> >>>> >>>> org.apache.flink >>>> flink-test-utils >>>> ${flink.version} >>>> test >>>> >>>> >>>> On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger >>>> wrote: >>>> > Hi Nick, >>>> > >>>> > we are usually publishing the test artifacts. Can you try and >>>> replace the >>>> > tag by test-jar: >>>> > >>>> > >>>> > org.apache.flink >>>> > flink-streaming-core >>>> > ${flink.version} >>>> > test-jar >>>> > test >>>> > >>>> > >>>> > >>>> > On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk >>>> wrote: >>>> >> >>>> >> Hello, >>>> >> >>>> >> I'm attempting integration tests for my streaming flows. I'd like to >>>> >> produce an input stream of java objects and sink the results into a >>>> >> collection for verification via JUnit asserts. >>>> >> StreamExecutionEnvironment provides methods for the former, however, >>>> >> how to achieve the latter is not evident based on my internet >>>> >> searching. I think I've found a solution in the TestStreamEnvironment >>>> >> class, ie, as used by WindowingIntegrationTest. However, this class >>>> >> appears to be packaged in the flink-streaming-core test artifact, >>>> >> which is not published to maven. >>>> >> >>>> >> For reference, this is the maven dependency stanza I'm using. Please >>>> >> let me know if I've got it wrong. >>>> >> >>>> >> Thanks, >>>> >> Nick >>>> >> >>>> >> >>>> >> org.apache.flink >>>> >> flink-streaming-core >>>> >> ${flink.version} >>>> >> test >>>> >> test >>>> >> >>>> > >>>> > >>>> >>> >>> > --001a113508ee33fe6d0524d14811 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
There is no global order in parallel streams, it is someth= ing that applications need to work with. We are thinking about adding opera= tions to introduce event-time order (at the cost of some delay), but that i= s only plans at this point.


What I do in my tes= ts is run the test streams in parallel, but the Sink in DOP 1. The sink gat= hers the elements in a list, and the close() function validates the result.=

Validating the results may involve sorting the li= st where elements where gathered (make the order deterministic) or use a ha= sh set if it is only about distinct count.

Hope th= at helps.

On Thu, Nov 12, 2015 at 8:24 PM, Nick Dimiduk &= lt;ndimiduk@gmail.c= om> wrote:
On = Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk <ndimiduk@gmail.com> wrote:
Thanks Stephan, I'll check that out in t= he morning. Generally speaking, it would be great to have some single-jvm e= xample tests for those of us getting started. Following the example of Wind= owingIntegrationTest is mostly working, though reusing my single sink insta= nce with its static collection results in non-deterministic results; there = appears to be a race between instances clearing the collection in their ope= n method and the runtime returning the collection to my test harness.

This inconsistent test result is pretty f= rustrating. I've created a sample project with an IT that demonstrates = the issue. Run `mvn test` multiple times and see that sometimes it passes a= nd sometimes it fails. Maybe someone has some thoughts?


Thanks,
Nick

I'd also appreciate some guidan= ce on stream composition. It's nice to use the fluent API when explorin= g data in a shell, but it seems to me like that API=C2=A0is cumbersome=C2= =A0when composing data pipelines of reusable partials. Or maybe I'm doi= ng it all wrong... Hence the request for more examples :)
While I'm asking, how might you model this: I have a set of= predicates I'd like to flatMap=C2=A0over a stream. An input item shoul= d be compared vs every predicate (basically, I want a=C2=A0Clojure=C2=A0jux= t of predicates over each stream element). Imagine those predicates express= ed as where clauses via the Table API. Say I have hundreds of=C2=A0thousand= s of these predicates to run over every stream event. Is the java client AP= I rich enough to express such a flow, or should I examine something lower t= han DataStream?

Thanks a lot, and sorry for all th= e newb questions.
-n<= div>


On Thursday, November 5, 2015, Stephan Ewen <sewen@apache.org> wrote:=
Hey!


It= should work well locally for testing. In that case you can write a program= as usual an use "DataStreamUtils.collect(stream)", so you need t= o stop reading it once you know the stream is exhausted...

Stephan


<= div class=3D"gmail_quote">On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <ndimiduk@gmail.com> wrote:
Hi Robert,

It seems "type" was what I needed. This it also looks like the te= st
jar has an undeclared dependency. In the end, the following allowed me
to use TestStreamEnvironment for my integration test. Thanks a lot!

-n

=C2=A0 =C2=A0 <dependency>
=C2=A0 =C2=A0 =C2=A0 <groupId>org.apache.flink</groupId>
=C2=A0 =C2=A0 =C2=A0 <artifactId>flink-streaming-core</artifactId&= gt;
=C2=A0 =C2=A0 =C2=A0 <version>${flink.version}</version>
=C2=A0 =C2=A0 =C2=A0 <type>test-jar</type>
=C2=A0 =C2=A0 =C2=A0 <scope>test</scope>
=C2=A0 =C2=A0 </dependency>
=C2=A0 =C2=A0 <dependency>
=C2=A0 =C2=A0 =C2=A0 <groupId>org.apache.flink</groupId>
=C2=A0 =C2=A0 =C2=A0 <artifactId>flink-test-utils</artifact= Id>
=C2=A0 =C2=A0 =C2=A0 <version>${flink.version}</version>
=C2=A0 =C2=A0 =C2=A0 <scope>test</scope>
=C2=A0 =C2=A0 </dependency>

On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <rmetzger@apache.org<= /a>> wrote:
> Hi Nick,
>
> we are usually publishing the test=C2=A0 artifacts. Can you try and re= place the
> <classifier> tag by <type>test-jar<type>:
>
> <dependency>
>=C2=A0 =C2=A0 <groupId>org.apache.flink</groupId>
>=C2=A0 =C2=A0 <artifactId>flink-streaming-core</artifactId>=
>=C2=A0 =C2=A0 <version>${flink.version}</version>
>=C2=A0 =C2=A0 <type>test-jar</type>
>=C2=A0 =C2=A0 <scope>test</scope>
> </dependency>
>
>
> On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <
ndimiduk@gmail.com= > wrote:
>>
>> Hello,
>>
>> I'm attempting integration tests for my streaming flows. I'= ;d like to
>> produce an input stream of java objects and sink the results into = a
>> collection for verification via JUnit asserts.
>> StreamExecutionEnvironment provides methods for the former, howeve= r,
>> how to achieve the latter is not evident based on my internet
>> searching. I think I've found a solution in the TestStreamEnvi= ronment
>> class, ie, as used by WindowingIntegrationTest. However, this clas= s
>> appears to be packaged in the flink-streaming-core test artifact,<= br> >> which is not published to maven.
>>
>> For reference, this is the maven dependency stanza I'm using. = Please
>> let me know if I've got it wrong.
>>
>> Thanks,
>> Nick
>>
>>=C2=A0 =C2=A0 =C2=A0<dependency>
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0<groupId>org.apache.flink</grou= pId>
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0<artifactId>flink-streaming-core&l= t;/artifactId>
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0<version>${flink.version}</vers= ion>
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0<classifier>test</classifier>= ;
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0<scope>test</scope>
>>=C2=A0 =C2=A0 =C2=A0</dependency>
>
>



--001a113508ee33fe6d0524d14811--