Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 35387200C09 for ; Wed, 25 Jan 2017 21:47:32 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 33DBC160B3D; Wed, 25 Jan 2017 20:47:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7E774160B4E for ; Wed, 25 Jan 2017 21:47:31 +0100 (CET) Received: (qmail 37570 invoked by uid 500); 25 Jan 2017 20:47:30 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 37288 invoked by uid 99); 25 Jan 2017 20:47:30 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Jan 2017 20:47:30 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id E0A09C234D for ; Wed, 25 Jan 2017 20:47:29 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -1.199 X-Spam-Level: X-Spam-Status: No, score=-1.199 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 1EGQx9EUCCo8 for ; Wed, 25 Jan 2017 20:47:27 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 6F7835FC75 for ; Wed, 25 Jan 2017 20:47:27 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id BED0FE040D for ; Wed, 25 Jan 2017 20:47:26 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 6DD6F25289 for ; Wed, 25 Jan 2017 20:47:26 +0000 (UTC) Date: Wed, 25 Jan 2017 20:47:26 +0000 (UTC) From: "Francesco Lemma (JIRA)" To: dev@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (KAFKA-4691) ProducerInterceptor.onSend() is called after key and value are serialized MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 25 Jan 2017 20:47:32 -0000 [ https://issues.apache.org/jira/browse/KAFKA-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838538#comment-15838538 ] Francesco Lemma edited comment on KAFKA-4691 at 1/25/17 8:46 PM: ----------------------------------------------------------------- Thanks for commenting Matthias. The fact that there's one single producer which uses byte[] for key and value isn't necessary a limitation. The problem is that the interceptor is called after the serialization happens. If the interceptor was called before (as it should according to the JavaDoc), then it could manipulate the message (because the message it's not yet serialized) and, regardless of the type, the manipulated message will be then serialized to a byte[] by the RecordCollector and will be compatible with the single Kafka Streams producer. Am I missing something here? I think the problem is where the interceptors are triggered. was (Author: francescolemma): Thanks for commenting Matthias. The fact that there's one single producer which uses byte[] for key and value isn't necessary a limitation. The problem is that the interceptor is called after the serialization happens. If the interceptor is called before (as it should according to the JavaDoc), then it can manipulate the message (because the message it's not yet serialized) and, regardless of the type, the manipulated message will be then serialized to a byte[] by the RecordCollector and will be compatible with the single Kafka Streams producer. Am I missing something here? I think the problem is where the interceptors are triggered. > ProducerInterceptor.onSend() is called after key and value are serialized > ------------------------------------------------------------------------- > > Key: KAFKA-4691 > URL: https://issues.apache.org/jira/browse/KAFKA-4691 > Project: Kafka > Issue Type: Bug > Components: clients, streams > Affects Versions: 0.10.1.1 > Reporter: Francesco Lemma > Labels: easyfix > Attachments: 2017-01-24 00_50_55-SDG_CR33_DevStudio - Java EE - org.apache.kafka.streams.processor.internals.Reco.png > > > According to the JavaDoc (https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html) " This is called from KafkaProducer.send(ProducerRecord) and KafkaProducer.send(ProducerRecord, Callback) methods, before key and value get serialized and partition is assigned (if partition is not specified in ProducerRecord)". > Although when using this with Kafka Streams (StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)) the key and value contained in the record object are already serialized. > As you can see from the screenshot, the serialization is performed inside RecordCollectionImpl.send(ProducerRecord record, Serializer keySerializer, Serializer valueSerializer, > StreamPartitioner partitioner), effectively before calling the send method of the producer which will trigger the interceptor. > This makes it unable to perform any kind of operation involving the key or value of the message, unless at least performing an additional deserialization. -- This message was sent by Atlassian JIRA (v6.3.4#6332)