From user-return-33778-archive-asf-public=cust-asf.ponee.io@flink.apache.org Sun Mar 29 12:08:09 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id C4B56180644 for ; Sun, 29 Mar 2020 14:08:08 +0200 (CEST) Received: (qmail 79991 invoked by uid 500); 29 Mar 2020 12:08:07 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 79981 invoked by uid 99); 29 Mar 2020 12:08:07 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 29 Mar 2020 12:08:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id BAD3DC00F9 for ; Sun, 29 Mar 2020 12:08:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.533 X-Spam-Level: *** X-Spam-Status: No, score=3.533 tagged_above=-999 required=6.31 tests=[DKIM_ADSP_CUSTOM_MED=0.001, FORGED_GMAIL_RCVD=1, KAM_DMARC_NONE=0.25, KAM_DMARC_STATUS=0.01, NML_ADSP_CUSTOM_MED=1.2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_PASS=-0.001, SPF_SOFTFAIL=0.972, URIBL_BLOCKED=0.001, URI_HEX=0.1] autolearn=disabled Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id M4RLPIcr0PJ9 for ; Sun, 29 Mar 2020 12:08:04 +0000 (UTC) Received-SPF: Softfail (mailfrom) identity=mailfrom; client-ip=199.38.86.66; helo=n4.nabble.com; envelope-from=krzysiek.chmielewski@gmail.com; receiver= Received: from n4.nabble.com (n4.nabble.com [199.38.86.66]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTP id 72E97BB818 for ; Sun, 29 Mar 2020 12:08:04 +0000 (UTC) Received: from n4.nabble.com (localhost [127.0.0.1]) by n4.nabble.com (Postfix) with ESMTP id EBC41121A00EE for ; Sun, 29 Mar 2020 07:08:01 -0500 (CDT) Date: Sun, 29 Mar 2020 07:08:01 -0500 (CDT) From: KristoffSC To: user@flink.apache.org Message-ID: <1585483681961-0.post@n4.nabble.com> In-Reply-To: <1585378449439-0.post@n4.nabble.com> References: <1585343609910-0.post@n4.nabble.com> <1585345243767-0.post@n4.nabble.com> <1585378449439-0.post@n4.nabble.com> Subject: Re: Testing RichAsyncFunction with TestHarness MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit Hi, another update on this one. I managed to make the workaround a little bit cleaner. The test setup I have now is like this: ByteArrayOutputStream streamEdgesBytes = new ByteArrayOutputStream(); ObjectOutputStream oosStreamEdges = new ObjectOutputStream(streamEdgesBytes); oosStreamEdges.writeObject(Collections.emptyList()); KryoSerializer kryoSerializer = new KryoSerializer<>( MyMessage.class, executionConfig); ByteArrayOutputStream kryoSerializerBytes = new ByteArrayOutputStream(); ObjectOutputStream oosKryoSerializer = new ObjectOutputStream(kryoSerializerBytes); oosKryoSerializer.writeObject(kryoSerializer); Configuration configuration = new Configuration(); configuration.setBytes("edgesInOrder", streamEdgesBytes.toByteArray()); configuration.setBytes("typeSerializer_in_1", kryoSerializerBytes.toByteArray()); MockEnvironment environment = MockEnvironment.builder().build(); ExecutionConfig executionConfig = environment.getExecutionConfig(); environment.getTaskConfiguration().addAll(configuration); this.testHarness = new OneInputStreamOperatorTestHarness<>( new AsyncWaitOperator<>(processFunction, 2000, 1, OutputMode.UNORDERED), environment); With this setup, this.testHarness.open(); works. However there is another problem, When calling: testHarness.processElement(myMessage, 1L); it throws another exception: java.lang.AssertionError at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.addAsyncBufferEntry(AsyncWaitOperator.java:400) at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:228) at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:112) at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:107) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/