beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kenneth Knowles (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (BEAM-3423) Distinct.withRepresentativeValueFn throws CoderException "cannot encode null KV"
Date Mon, 08 Jan 2018 18:42:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-3423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Kenneth Knowles updated BEAM-3423:
----------------------------------
    Summary: Distinct.withRepresentativeValueFn throws CoderException "cannot encode null
KV"   (was: Distinct.withRepresentativeValueFn with EventTime Trigger throws Exceptions )

> Distinct.withRepresentativeValueFn throws CoderException "cannot encode null KV" 
> ---------------------------------------------------------------------------------
>
>                 Key: BEAM-3423
>                 URL: https://issues.apache.org/jira/browse/BEAM-3423
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.2.0
>         Environment: ubuntu16.04, idea, java8
>            Reporter: huangjianhuang
>            Assignee: Kenneth Knowles
>
> My code as follow:
> {code:java}
> pipeline
>                 //Read data
>                 .apply("Read from kafka",
>                         KafkaIO.<String, String>read()
>                                 .withBootstrapServers("localhost:9092")
>                                 .withTopic(topic)
>                                 .withKeyDeserializer(StringDeserializer.class)
>                                 .withValueDeserializer(StringDeserializer.class)
>                                 .withoutMetadata()
>                 )
>                 .apply(Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(10)))
>                         .triggering(AfterWatermark.pastEndOfWindow()
>                                 .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
>                         .discardingFiredPanes().withAllowedLateness(Duration.ZERO))
>                 //works fine
> //                .apply(Distinct.create())
>                 //ops! -> CoderException: cannot encode a null KV
>                 .apply(Distinct.withRepresentativeValueFn(new Val()).withRepresentativeType(TypeDescriptors.strings()))
>                 .apply(MapElements.into(TypeDescriptors.nulls())
>                         .via(input -> {
>                             System.out.println(Instant.now());
>                             System.out.println(input);
>                             return null;
>                         }));
>     private static class Val implements SerializableFunction<KV<String, String>,
String> {
>         @Override
>         public String apply(KV<String, String> input) {
>             return input.getValue();
>         }
>     }
> {code}
> Input words to Kafka:
> word1
> //after 10s
> word2
> Then got exceptions as follow:
> {code:java}
> begin
> 2018-01-06T11:18:52.971Z
> KV{null, a}
> Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException:
org.apache.beam.sdk.coders.CoderException: cannot encode a null KV
> 	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:344)
> 	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:314)
> 	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208)
> 	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
> 	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
> 	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:289)
> 	at com.xiaomi.huyu.processor.dev.EntryPoint.main(EntryPoint.java:37)
> Caused by: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot
encode a null KV
> 	at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:113)
> 	at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
> 	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
> 	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
> 	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
> 	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
> 	at org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2149)
> Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV
> 	at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:70)
> 	at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
> 	at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
> 	at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
> 	at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93)
> 	at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77)
> 	at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:62)
> 	at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:106)
> 	at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
> 	at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:111)
> 	at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
> 	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
> 	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
> 	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
> 	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
> 	at org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2149)
> 	at org.apache.beam.sdk.transforms.Combine$GroupedValues$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
> 	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
> 	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
> 	at org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:70)
> 	at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:182)
> 	at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51)
> 	at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146)
> 	at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:748)
> {code}
> But if I use .apply(Distinct.create()) , it works fine. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message