Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DDE86183C0 for ; Mon, 5 Oct 2015 09:32:55 +0000 (UTC) Received: (qmail 86593 invoked by uid 500); 5 Oct 2015 09:32:46 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 86517 invoked by uid 500); 5 Oct 2015 09:32:46 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 86507 invoked by uid 99); 5 Oct 2015 09:32:46 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Oct 2015 09:32:46 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id D548EC0E08 for ; Mon, 5 Oct 2015 09:32:45 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.999 X-Spam-Level: ** X-Spam-Status: No, score=2.999 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.001, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id aM38ih-WLJYW for ; Mon, 5 Oct 2015 09:32:44 +0000 (UTC) Received: from mail-qg0-f48.google.com (mail-qg0-f48.google.com [209.85.192.48]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id F055421275 for ; Mon, 5 Oct 2015 09:32:43 +0000 (UTC) Received: by qgez77 with SMTP id z77so144636756qge.1 for ; Mon, 05 Oct 2015 02:32:37 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to:content-type; bh=H2j501/9FeUub6/Fwn3KdBJwQEgsKJJYtTO6Rb/qXqs=; b=jnuAxigzQjRqtgm2pgSKfR6EYCiCQB4Q1BFOMdIeUn9nHC0ZsiTpk/mFXxKYmTpgo1 3i80A7DkEQUTD2Xk+RNgV2I9pJ73a28ZSutAf7J3IlcRLP5AJT6Ift3XA2TVLboWmqpp owvUVlhRtLdx5I57nMg569SYGcpdFztvtsDE6OJfm30e4J6VTbK21y/b/CDAMafxKJ5o 2UFxVRqR1DXYrJI4bIr4jwvzRGoWU+W8m/zklPAl96qcqJh4y6wZndjmFCqTYgz91L6M jXTFa/r5SP6enn32KGitPUuJKgH/aDsqzMc+4UV0UJRYB58Rd4rD1JPKzpvzgqRi3zJN GswQ== MIME-Version: 1.0 X-Received: by 10.140.32.200 with SMTP id h66mr36194373qgh.99.1444037557385; Mon, 05 Oct 2015 02:32:37 -0700 (PDT) Sender: ewenstephan@gmail.com Received: by 10.55.37.5 with HTTP; Mon, 5 Oct 2015 02:32:37 -0700 (PDT) In-Reply-To: <561240F0.6040609@apache.org> References: <561240F0.6040609@apache.org> Date: Mon, 5 Oct 2015 11:32:37 +0200 X-Google-Sender-Auth: aYmlOVc3wJ3YvRlruh-rHPD0jSg Message-ID: Subject: Re: Destroy StreamExecutionEnv From: Stephan Ewen To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a1139b126a67d8a05215831fa --001a1139b126a67d8a05215831fa Content-Type: text/plain; charset=UTF-8 Matthias' solution should work in most cases. In cases where you do not control the source (or the source can never be finite, like the Kafka source), we often use a trick in the tests, which is throwing a special type of exception (a SuccessException). You can catch this exception on env.execute() (it is the nested cause) and decide that this qualifies the test as successful... Greetings, Stephan On Mon, Oct 5, 2015 at 11:20 AM, Matthias J. Sax wrote: > Hi, > > you just need to terminate your source (ie, return from run() method if > you implement your own source function). This will finish the complete > program. For already available sources, just make sure you read finite > input. > > Hope this helps. > > -Matthias > > On 10/05/2015 12:15 AM, jay vyas wrote: > > Hi folks. > > > > How do we end a stream execution environment? > > > > I have a unit test which runs a streaming job, and want the unit test to > > die after the first round of output is processed... > > > > > > DataStream> counts = > > dataStream.map( > > new MapFunction>() { > > @Override > > public Tuple2 map(String s) throws Exception { > > Map transaction = MAPPER.readValue(s, Map.class); > > return new Tuple2<>(transaction, 1); > > } > > }); > > counts.print(); > > > > > > > > -- > > jay vyas > > --001a1139b126a67d8a05215831fa Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Matthias' solution should work in most cases.

=
In cases where you do not control the source (or the source can = never be finite, like the Kafka source), we often use a trick in the tests,= which is throwing a special type of exception (a SuccessException).
<= div>
You can catch this exception on env.execute() (it is the= nested cause) and decide that this qualifies the test as successful...

Greetings,
Stephan


On Mon, Oct 5, 2015 at= 11:20 AM, Matthias J. Sax <mjsax@apache.org> wrote:
Hi,

you just need to terminate your source (ie, return from run() method if
you implement your own source function). This will finish the complete
program. For already available sources, just make sure you read finite
input.

Hope this helps.

-Matthias

On 10/05/2015 12:15 AM, jay vyas wrote:
> Hi folks.
>
> How do we end a stream execution environment?
>
> I have a unit test which runs a streaming job, and want the unit test = to
> die after the first round of output is processed...
>
>
> DataStream<Tuple2<Map, Integer>> counts =3D
>=C2=A0 =C2=A0 =C2=A0dataStream.map(
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0new MapFunction<String, Tuple2<= Map, Integer>>() {
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0@Override
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0public Tuple2<Map, Integer&= gt; map(String s) throws Exception {
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Map transaction =3D MAP= PER.readValue(s, Map.class);
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return new Tuple2<&g= t;(transaction, 1);
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0});
> counts.print();
>
>
>
> --
> jay vyas


--001a1139b126a67d8a05215831fa--