flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input
Date Fri, 24 Feb 2017 09:04:45 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882263#comment-15882263
] 

ASF GitHub Bot commented on FLINK-3679:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102899179
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java
---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +
    +/**
    + * RichDeserializationSchema describes how to turn byte key / value messages into zero
or more messages into data types.
    + * {@see KeyedSerializationSchema}
    + *
    + * @param <T> The type created by the keyed deserialization schema.
    + */
    +public interface RichKeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T>
{
    +	/**
    +	 * Deserializes the byte message.
    +	 *
    +	 * @param messageKey the key as a byte array (null if no key has been set)
    +	 * @param message The message, as a byte array. (null if the message was empty or deleted)
    +	 * @param partition The partition the message has originated from
    +	 * @param offset the offset of the message in the original source (for example the Kafka
offset)
    +	 *
    +	 * @return The deserialized message as an object.
    +	 */
    +	void deserialize(byte[] messageKey, byte[] message, String topic, int partition, long
offset,
    +						Collector<T> collector) throws IOException;
    --- End diff --
    
    The indentation of the parameters here seems a bit off.
    Now with the number of parameters to be quite lengthy, it might be a good style to have
one parameter per line.


> DeserializationSchema should handle zero or more outputs for every input
> ------------------------------------------------------------------------
>
>                 Key: FLINK-3679
>                 URL: https://issues.apache.org/jira/browse/FLINK-3679
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, Kafka Connector
>            Reporter: Jamie Grier
>            Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think should be
improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one mapping between
input and outputs.  In reality there are scenarios where one input message (say from Kafka)
might actually map to zero or more logical elements in the pipeline.
> Particularly important here is the case where you receive a message from a source (such
as Kafka) and say the raw bytes don't deserialize properly.  Right now the only recourse is
to throw IOException and therefore fail the job.  
> This is definitely not good since bad data is a reality and failing the job is not the
right option.  If the job fails we'll just end up replaying the bad data and the whole thing
will start again.
> Instead in this case it would be best if the user could just return the empty set.
> The other case is where one input message should logically be multiple output messages.
 This case is probably less important since there are other ways to do this but in general
it might be good to make the DeserializationSchema.deserialize() method return a collection
rather than a single element.
> Maybe we need to support a DeserializationSchema variant that has semantics more like
that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message