Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8E20B200C32 for ; Thu, 9 Mar 2017 22:09:49 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8C9AE160B75; Thu, 9 Mar 2017 21:09:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B11DF160B5F for ; Thu, 9 Mar 2017 22:09:48 +0100 (CET) Received: (qmail 5022 invoked by uid 500); 9 Mar 2017 21:09:47 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 5012 invoked by uid 99); 9 Mar 2017 21:09:47 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Mar 2017 21:09:47 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 55D16C668F for ; Thu, 9 Mar 2017 21:09:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.879 X-Spam-Level: * X-Spam-Status: No, score=1.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 5lMaXQeXi65J for ; Thu, 9 Mar 2017 21:09:46 +0000 (UTC) Received: from mail-yb0-f182.google.com (mail-yb0-f182.google.com [209.85.213.182]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 4D6F45F1EE for ; Thu, 9 Mar 2017 21:09:41 +0000 (UTC) Received: by mail-yb0-f182.google.com with SMTP id a5so5595805ybb.2 for ; Thu, 09 Mar 2017 13:09:41 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:from:date:message-id:subject:to; bh=ewpoCI1wV3KU5/9Rd0qzc21ahDQF+lLkucUZqC6gdiY=; b=upRxnDk+etwSZeQnHhGJLRQ0u9nEoIxkmSTL+bcPS4RhxYnEmhyc1GyyvnFM2hfbBe kp1Al6oUzP8O+RA6hLhyMmGPVSmI8TRv7Z8ox4K7Ol5VEODqEZpu3yoQNlAb8LlFyPIQ VDmWgC5TgK0qoNgTZUDaD2wn9mA6ZUM+2yM1t54fMMEgIU/4S2NgiVBZRKJU7rpHLCRM RSfB+SJ0P82+qLju4XRkGu1IYhAT2X77LFO50sCSiN3rhzJ2jS/PxCunW2/mQUHy8JKa aeV4rR/jug7fqgNkLt6ce2H+qUJ7CwIbjmjB1QzkM6Aaca0Rw4MrR8SBg0KF0PivMNQg q6OQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=ewpoCI1wV3KU5/9Rd0qzc21ahDQF+lLkucUZqC6gdiY=; b=ZXrI4ZcIAX87S3hXD5EQUAtrKH7VnaWeBu+nsCKH0jQwE2o8DCL6/QjQvcp0/Tfpaf vU3uw17Pkz5CKvjtpzV+H2L8ifhkE4b4g4akMWycUMpYhYHt+X5xUAYqIDx7RBL6Dtyo 0SErWHf4FUpEdAc53n2N6khdmHKSxOSSAdbBMwqmwdGMzTlOuYWYEyqsE0Jmk1jEPQ35 4f4ELgV0l7mcyhpmBe88iqlIjDeeDysuovrhNXaVC3/O1DSGPfkbucYDgM6N/AWB8QT+ OjBaOFxS1jZQG50HN24QAiUdAV+tnwlwoon6qlJtRycFjloVPWMsBCaYA5osmqtuW0Mq y7uw== X-Gm-Message-State: AMke39nRT3yKhuZOX1JvxL/0qTY14J5Hzt4O3I0RYi7kDxNd1Zg5S9qLSZc3/VjHFwNcBCDoM0kYJnBedeHSgg== X-Received: by 10.37.101.8 with SMTP id z8mr5466909ybb.150.1489093775394; Thu, 09 Mar 2017 13:09:35 -0800 (PST) MIME-Version: 1.0 Received: by 10.37.75.132 with HTTP; Thu, 9 Mar 2017 13:09:34 -0800 (PST) From: MAHESH KUMAR Date: Thu, 9 Mar 2017 14:09:34 -0700 Message-ID: Subject: Flink - Writing Test Case for the Datastream To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a1142f506851aa5054a52a998 archived-at: Thu, 09 Mar 2017 21:09:49 -0000 --001a1142f506851aa5054a52a998 Content-Type: text/plain; charset=UTF-8 Hi Team, I am trying to write test cases to check whether the job is getting executed as desired. I am using the Flink test util. I am trying to do a end to end testing where Flink reads from a Kafka Queue, does some processing and then writes the output to another topic of the Kafka Queue. My objective is to read the message from the output topic and check if it has the same message as expected. I have got Zookeeper and Kafka configured for the test. When I start the Flink Job, it never terminates since it's source is a Kafka Source. Is there a way to run a job for a specific interval of time or how do I go about testing this scenario. Is there any documentation/example for running test cases such as these? My code currently looks something like this: class StreamingMultipleTest extends StreamingMultipleProgramsTestBase { @Before def initialize() = { // Start Kafka, Zookeeper // Call the run method of the Flink Class - FlinkClass.run() // This class contains the env.execute() // My code does not execute any further since the previous call is never returned. } @Test def Test1() = { // Check if the Output Topic of the Kafka Queue is as expected - AssertStatement } @After def closeServices() = { // Stop Zookeeper and Kafka } } Thanks and Regards, Mahesh -- Mahesh Kumar Ravindranathan Data Streaming Engineer Oracle Marketing Cloud - Social Platform Contact No:+1(720)492-4445 --001a1142f506851aa5054a52a998 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Team,

I am trying to write test case= s to check whether the job is getting executed as desired. I am using the F= link test util. I am trying to do a end to end testing where Flink reads fr= om a Kafka Queue, does some processing and then writes the output to anothe= r topic of the Kafka Queue. My objective is to read the message from the ou= tput topic and check if it has the same message as expected.

I have got Zookeeper and Kafka configured for = the test. When I start the Flink Job, it never terminates since it's so= urce is a Kafka Source. Is there a way to run a job for a specific interval= of time or how do I go about testing this scenario. Is there any documenta= tion/example for running test cases such as these?

My code currently looks something like this:

clas= s StreamingMultipleTest extends StreamingMultipleProgramsTestBase=C2=A0
{

@Before def initialize() =3D {
<= div>// Start Kafka, Zookeeper
// Call the run method of the Flink= Class - FlinkClass.run() =C2=A0// This class contains the env.execute()

// My code does not execute any further since the pr= evious call is never returned.
}

@Test d= ef Test1() =3D {
// Check if the Output Topic of the Kafka Queue = is as expected - AssertStatement

}

<= /div>
@After def closeServices() =3D {
// Stop Zookeeper and = Kafka
}

}


<= /div>
Thanks and Regards,
Mahesh

--
<= div class=3D"gmail_signature">

Mahes= h Kumar Ravindranathan =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0=C2=A0
Data Streaming Engineer=C2= =A0
Oracle Marketing Cloud - Social Pl= atform
Contact No:+1(720)492-4445

--001a1142f506851aa5054a52a998--