Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C909D18B40 for ; Fri, 6 Nov 2015 17:04:10 +0000 (UTC) Received: (qmail 72016 invoked by uid 500); 6 Nov 2015 17:04:05 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 71934 invoked by uid 500); 6 Nov 2015 17:04:05 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 71924 invoked by uid 99); 6 Nov 2015 17:04:05 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Nov 2015 17:04:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 07C69C0960 for ; Fri, 6 Nov 2015 17:04:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.12 X-Spam-Level: X-Spam-Status: No, score=-0.12 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 9DC9kRxFTGVA for ; Fri, 6 Nov 2015 17:03:59 +0000 (UTC) Received: from mail-wm0-f47.google.com (mail-wm0-f47.google.com [74.125.82.47]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id F259120A92 for ; Fri, 6 Nov 2015 17:03:58 +0000 (UTC) Received: by wmll128 with SMTP id l128so46181512wml.0 for ; Fri, 06 Nov 2015 09:03:58 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type:content-transfer-encoding; bh=J2vI1NTxDJc6F22P+qQeLclA78nLtcsmx4qV0iKISSE=; b=Pgaq55rgyKabf1kVJc91VuaCPOy+ou2OoD09MvqCzM20Wqw0A8efcCt1qM7onZpflZ Nzh6ODWB7hUCfevJIxGWctYGCf1c1eYgD2cT3JJYI9IPHvDPcAOvsdMinM4amORS0Q+r dtmCmBlwOWbso0rfl+JCCp4oso7Spb47nCNZjyAVRCchVubBNBTO54++RAIOuTQiMkPv p2vyW7cgCyOva3JrMQ+Ki2+QdcM0Vy+cFsg0x8xuANKnTXSnGqkcGArzvVBydD2WhvrU XwO5QrDFAN+xVhbU94WbhQi7Cnyff1ni9+gVnVVz9Yhjq2UJ9CsEYJZL6+UcAJ9AlCwv Kacw== X-Received: by 10.28.14.140 with SMTP id 134mr10868694wmo.44.1446829438592; Fri, 06 Nov 2015 09:03:58 -0800 (PST) MIME-Version: 1.0 Received: by 10.28.178.129 with HTTP; Fri, 6 Nov 2015 09:03:39 -0800 (PST) In-Reply-To: References: From: Nick Dimiduk Date: Fri, 6 Nov 2015 09:03:39 -0800 Message-ID: Subject: Re: Published test artifacts for flink streaming To: user@flink.apache.org Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Promising observation, Till. Is it possible to access Table API's select and where operators from within such a flatMap? -n On Fri, Nov 6, 2015 at 6:19 AM, Till Rohrmann wrote: > Hi Nick, > > I think a flatMap operation which is instantiated with your list of > predicates should do the job. Thus, there shouldn=E2=80=99t be a need to = dig deeper > than the DataStream for the first version. > > Cheers, > Till > > > On Fri, Nov 6, 2015 at 3:58 AM, Nick Dimiduk wrote: >> >> Thanks Stephan, I'll check that out in the morning. Generally speaking, = it >> would be great to have some single-jvm example tests for those of us get= ting >> started. Following the example of WindowingIntegrationTest is mostly >> working, though reusing my single sink instance with its static collecti= on >> results in non-deterministic results; there appears to be a race between >> instances clearing the collection in their open method and the runtime >> returning the collection to my test harness. >> >> I'd also appreciate some guidance on stream composition. It's nice to us= e >> the fluent API when exploring data in a shell, but it seems to me like t= hat >> API is cumbersome when composing data pipelines of reusable partials. Or >> maybe I'm doing it all wrong... Hence the request for more examples :) >> >> While I'm asking, how might you model this: I have a set of predicates I= 'd >> like to flatMap over a stream. An input item should be compared vs every >> predicate (basically, I want a Clojure juxt of predicates over each stre= am >> element). Imagine those predicates expressed as where clauses via the Ta= ble >> API. Say I have hundreds of thousands of these predicates to run over ev= ery >> stream event. Is the java client API rich enough to express such a flow,= or >> should I examine something lower than DataStream? >> >> Thanks a lot, and sorry for all the newb questions. >> -n >> >> >> On Thursday, November 5, 2015, Stephan Ewen wrote: >>> >>> Hey! >>> >>> There is also a collect() sink in the "flink-streaming-contrib" project= , >>> see here: >>> https://github.com/apache/flink/blob/master/flink-contrib/flink-streami= ng-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils= .java >>> >>> It should work well locally for testing. In that case you can write a >>> program as usual an use "DataStreamUtils.collect(stream)", so you need = to >>> stop reading it once you know the stream is exhausted... >>> >>> Stephan >>> >>> >>> On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk wrot= e: >>>> >>>> Hi Robert, >>>> >>>> It seems "type" was what I needed. This it also looks like the test >>>> jar has an undeclared dependency. In the end, the following allowed me >>>> to use TestStreamEnvironment for my integration test. Thanks a lot! >>>> >>>> -n >>>> >>>> >>>> org.apache.flink >>>> flink-streaming-core >>>> ${flink.version} >>>> test-jar >>>> test >>>> >>>> >>>> org.apache.flink >>>> flink-test-utils >>>> ${flink.version} >>>> test >>>> >>>> >>>> On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger >>>> wrote: >>>> > Hi Nick, >>>> > >>>> > we are usually publishing the test artifacts. Can you try and repla= ce >>>> > the >>>> > tag by test-jar: >>>> > >>>> > >>>> > org.apache.flink >>>> > flink-streaming-core >>>> > ${flink.version} >>>> > test-jar >>>> > test >>>> > >>>> > >>>> > >>>> > On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk >>>> > wrote: >>>> >> >>>> >> Hello, >>>> >> >>>> >> I'm attempting integration tests for my streaming flows. I'd like t= o >>>> >> produce an input stream of java objects and sink the results into a >>>> >> collection for verification via JUnit asserts. >>>> >> StreamExecutionEnvironment provides methods for the former, however= , >>>> >> how to achieve the latter is not evident based on my internet >>>> >> searching. I think I've found a solution in the TestStreamEnvironme= nt >>>> >> class, ie, as used by WindowingIntegrationTest. However, this class >>>> >> appears to be packaged in the flink-streaming-core test artifact, >>>> >> which is not published to maven. >>>> >> >>>> >> For reference, this is the maven dependency stanza I'm using. Pleas= e >>>> >> let me know if I've got it wrong. >>>> >> >>>> >> Thanks, >>>> >> Nick >>>> >> >>>> >> >>>> >> org.apache.flink >>>> >> flink-streaming-core >>>> >> ${flink.version} >>>> >> test >>>> >> test >>>> >> >>>> > >>>> > >>> >>> >