From commits-return-55337-archive-asf-public=cust-asf.ponee.io@beam.apache.org Mon Jan 8 19:42:07 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id DAF4E180607 for ; Mon, 8 Jan 2018 19:42:07 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id CAAF0160C2C; Mon, 8 Jan 2018 18:42:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E9BD7160C29 for ; Mon, 8 Jan 2018 19:42:06 +0100 (CET) Received: (qmail 39350 invoked by uid 500); 8 Jan 2018 18:42:06 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 39337 invoked by uid 99); 8 Jan 2018 18:42:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Jan 2018 18:42:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id A7D6D18057F for ; Mon, 8 Jan 2018 18:42:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -107.911 X-Spam-Level: X-Spam-Status: No, score=-107.911 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_LOW=-0.7, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id kXJOhqNAGoyw for ; Mon, 8 Jan 2018 18:42:04 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 9444F5FBA8 for ; Mon, 8 Jan 2018 18:42:02 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 880DCE2581 for ; Mon, 8 Jan 2018 18:42:01 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 3814924106 for ; Mon, 8 Jan 2018 18:42:00 +0000 (UTC) Date: Mon, 8 Jan 2018 18:42:00 +0000 (UTC) From: "Kenneth Knowles (JIRA)" To: commits@beam.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (BEAM-3423) Distinct.withRepresentativeValueFn throws CoderException "cannot encode null KV" MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/BEAM-3423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316763#comment-16316763 ] Kenneth Knowles commented on BEAM-3423: --------------------------------------- I just mean that this bug does not have to do with triggers. Your MyKV probably works because it is using SerializableCoder which can encode null values. > Distinct.withRepresentativeValueFn throws CoderException "cannot encode null KV" > --------------------------------------------------------------------------------- > > Key: BEAM-3423 > URL: https://issues.apache.org/jira/browse/BEAM-3423 > Project: Beam > Issue Type: Bug > Components: sdk-java-core > Affects Versions: 2.2.0 > Environment: ubuntu16.04, idea, java8 > Reporter: huangjianhuang > Assignee: Kenneth Knowles > > My code as follow: > {code:java} > pipeline > //Read data > .apply("Read from kafka", > KafkaIO.read() > .withBootstrapServers("localhost:9092") > .withTopic(topic) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Window.>into(FixedWindows.of(Duration.standardSeconds(10))) > .triggering(AfterWatermark.pastEndOfWindow() > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5)))) > .discardingFiredPanes().withAllowedLateness(Duration.ZERO)) > //works fine > // .apply(Distinct.create()) > //ops! -> CoderException: cannot encode a null KV > .apply(Distinct.withRepresentativeValueFn(new Val()).withRepresentativeType(TypeDescriptors.strings())) > .apply(MapElements.into(TypeDescriptors.nulls()) > .via(input -> { > System.out.println(Instant.now()); > System.out.println(input); > return null; > })); > private static class Val implements SerializableFunction, String> { > @Override > public String apply(KV input) { > return input.getValue(); > } > } > {code} > Input words to Kafka: > word1 > //after 10s > word2 > Then got exceptions as follow: > {code:java} > begin > 2018-01-06T11:18:52.971Z > KV{null, a} > Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV > at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:344) > at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:314) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:289) > at com.xiaomi.huyu.processor.dev.EntryPoint.main(EntryPoint.java:37) > Caused by: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV > at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:113) > at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235) > at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) > at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) > at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) > at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) > at org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2149) > Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV > at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:70) > at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) > at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73) > at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) > at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93) > at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77) > at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:62) > at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.(MutationDetectors.java:106) > at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44) > at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:111) > at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235) > at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) > at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) > at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) > at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) > at org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2149) > at org.apache.beam.sdk.transforms.Combine$GroupedValues$1$DoFnInvoker.invokeProcessElement(Unknown Source) > at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) > at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) > at org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:70) > at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:182) > at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51) > at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146) > at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > {code} > But if I use .apply(Distinct.create()) , it works fine. -- This message was sent by Atlassian JIRA (v6.4.14#64029)