Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7D6F3200C38 for ; Wed, 1 Mar 2017 05:14:49 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 7C05A160B7C; Wed, 1 Mar 2017 04:14:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C4012160B7E for ; Wed, 1 Mar 2017 05:14:48 +0100 (CET) Received: (qmail 29207 invoked by uid 500); 1 Mar 2017 04:14:48 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 29198 invoked by uid 99); 1 Mar 2017 04:14:48 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Mar 2017 04:14:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 68820C06A5 for ; Wed, 1 Mar 2017 04:14:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.451 X-Spam-Level: * X-Spam-Status: No, score=1.451 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_NEUTRAL=0.652] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id i8PBg-_56Iya for ; Wed, 1 Mar 2017 04:14:46 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 9FF1360E17 for ; Wed, 1 Mar 2017 04:14:46 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 21B76E0630 for ; Wed, 1 Mar 2017 04:14:46 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 9563C24162 for ; Wed, 1 Mar 2017 04:14:45 +0000 (UTC) Date: Wed, 1 Mar 2017 04:14:45 +0000 (UTC) From: "Raghu Angadi (JIRA)" To: commits@beam.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (BEAM-1573) KafkaIO does not allow using Kafka serializers and deserializers MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 01 Mar 2017 04:14:49 -0000 [ https://issues.apache.org/jira/browse/BEAM-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15889471#comment-15889471 ] Raghu Angadi commented on BEAM-1573: ------------------------------------ [~peay], it is not clear what you are trying to do that is not possible now. Can you paste (psuedo) code for either Beam pipeline (or even simple {{consume(KafkaRecord record)}} method) to illustrate what you want to do? Btw, is this for source or sink? Note that if you don't do '.withoutMetaData()' on KafkaIO.read(), it return KafkaRecord, which does contain topic, partition and other metadata. > KafkaIO does not allow using Kafka serializers and deserializers > ---------------------------------------------------------------- > > Key: BEAM-1573 > URL: https://issues.apache.org/jira/browse/BEAM-1573 > Project: Beam > Issue Type: Bug > Components: sdk-java-extensions > Affects Versions: 0.4.0, 0.5.0 > Reporter: peay > Assignee: Raghu Angadi > Priority: Minor > > KafkaIO does not allow to override the serializer and deserializer settings of the Kafka consumer and producers it uses internally. Instead, it allows to set a `Coder`, and has a simple Kafka serializer/deserializer wrapper class that calls the coder. > I appreciate that allowing to use Beam coders is good and consistent with the rest of the system. However, is there a reason to completely disallow to use custom Kafka serializers instead? > This is a limitation when working with an Avro schema registry for instance, which requires custom serializers. One can write a `Coder` that wraps a custom Kafka serializer, but that means two levels of un-necessary wrapping. > In addition, the `Coder` abstraction is not equivalent to Kafka's `Serializer` which gets the topic name as input. Using a `Coder` wrapper would require duplicating the output topic setting in the argument to `KafkaIO` and when building the wrapper, which is not elegant and error prone. -- This message was sent by Atlassian JIRA (v6.3.15#6346)