Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 28FF72009F4 for ; Thu, 26 May 2016 09:31:16 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 278C3160A10; Thu, 26 May 2016 07:31:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 71938160939 for ; Thu, 26 May 2016 09:31:15 +0200 (CEST) Received: (qmail 32560 invoked by uid 500); 26 May 2016 07:31:14 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 32549 invoked by uid 99); 26 May 2016 07:31:14 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 May 2016 07:31:14 +0000 Received: from mail-lf0-f41.google.com (mail-lf0-f41.google.com [209.85.215.41]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id E3D1A1A00E4 for ; Thu, 26 May 2016 07:31:13 +0000 (UTC) Received: by mail-lf0-f41.google.com with SMTP id w16so16682717lfd.2 for ; Thu, 26 May 2016 00:31:13 -0700 (PDT) X-Gm-Message-State: ALyK8tInl5mvXRrCaKhUbXZtkO4FiRUPIvl3wPic6bnUQ5IXpdRf0hDEuf6DNWoPHuKkxNkMCEpi7dJpTEbY1Q== X-Received: by 10.25.27.135 with SMTP id b129mr2513748lfb.102.1464247872099; Thu, 26 May 2016 00:31:12 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Aljoscha Krettek Date: Thu, 26 May 2016 07:31:02 +0000 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Junit Issue while testing Kafka Source To: dev@flink.apache.org Content-Type: multipart/alternative; boundary=001a11402312479f0d0533b9c6b4 archived-at: Thu, 26 May 2016 07:31:16 -0000 --001a11402312479f0d0533b9c6b4 Content-Type: text/plain; charset=UTF-8 Hi, what we are doing in most internal tests is to verify in a sink whether the data is correct and then throw a SuccessException. This brings down the job and we check whether we catch a SuccessException to verify that the test was successful. Look, for example, at the ValidatingSink in EventTimeWindowCheckpointingITCase in the Flink source. Cheers, Aljoscha On Thu, 26 May 2016 at 01:58 Nick Dimiduk wrote: > I'm also curious for a solution here. My test code executes the flow from a > separate thread. Once i've joined on all my producer threads and I've > verified the output, I simply interrupt the flow thread. This spews > exceptions, but it all appears to be harmless. > > Maybe there's a better way? I think you'd need some "death pill" to send > into the stream that signals its termination. > > On Tue, May 24, 2016 at 7:29 PM, Vinay Patil > wrote: > > > Hi, > > > > I am able to read from a topic using FlinkKafkaConsumer and return the > > result, however when I am testing this scenario in Junit the result is > > getting printed(kafkaStream.print()) but I am not able to exit the Job, > > env.execute keeps running, > > I tried to return env.execute from method but that did not work either. > > > > 1) Is there any way to end the execution of job forcefully. > > 2) How do I test if the data has come from topic > > > > - One way I think of is to get the output of stream.print() in a > > PrintStream and check the result.(but not able to test this since job > is > > not getting exited) > > > > Please help with these issues > > > > Regards, > > Vinay Patil > > > --001a11402312479f0d0533b9c6b4--