From user-return-26410-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Mar 8 12:28:29 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3A883180626 for ; Fri, 8 Mar 2019 13:28:29 +0100 (CET) Received: (qmail 40789 invoked by uid 500); 8 Mar 2019 12:28:23 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 40779 invoked by uid 99); 8 Mar 2019 12:28:23 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Mar 2019 12:28:23 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id A60A2C25AA for ; Fri, 8 Mar 2019 12:28:22 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.799 X-Spam-Level: * X-Spam-Status: No, score=1.799 tagged_above=-999 required=6.31 tests=[DKIMWL_WL_MED=-0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id LNyTHTecS_fd for ; Fri, 8 Mar 2019 12:28:21 +0000 (UTC) Received: from mail-oi1-f194.google.com (mail-oi1-f194.google.com [209.85.167.194]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id CAA216128C for ; Fri, 8 Mar 2019 12:28:20 +0000 (UTC) Received: by mail-oi1-f194.google.com with SMTP id a81so15680555oii.11 for ; Fri, 08 Mar 2019 04:28:20 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:from:date:message-id:subject:to; bh=/VYhJIfxbMnNFnyIEfzTbJpK75N+JmCS8p07Mam8auA=; b=liIL8k3LXADQRa/NTcvUSM3rwWtC4qRQByXuFvwVko4Dn+8RfKeB9IFgeKVUbV5jla BJ+vW9PGO2Od0z9n8cn576emPdO3CWyqqU1yUI91908i8h9O+cBdwjfvbV8f1YiCHay5 n1Qi6tF6PoHeAYgLB1x6e4VOrtxoTzCNupMNswET2Ax4X8YcvE/TvFbxfqfKB+r8nLkv 2EgN7yonoCMMS/OIGW/3LH18LSeDUdMuGhlB4EuyJSnVggZDaX5Ar7kTsMkL8g4MM/W2 qewQblDq/g56FUIPb4j5N0u7utwLCfZK+NlffyIYkF5kTGQ8/gF3c0VpfeSqeld588cO 1YLQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=/VYhJIfxbMnNFnyIEfzTbJpK75N+JmCS8p07Mam8auA=; b=mzgEnebka+xsgWey8bUcibp/WPrDvZjtGuvLA80pUqzQ0cvc8Ju4o3VF4T5JPrPZEu SySXnGavOQg+Ni0Bm17VGn5+fz3ui9SQX4Lh9IAxnaVCLRQSfQnk89wbE7GNx60+oPCf 4Zw+rGR7fqTf220PwqkBFHXuOpFf0yOTQt32oIYfb9HGJVpidFE/h6BU4KA9fX14KGxw 8KVR1nSlhba9PIMD+OIgA6KLH3JDmC9GK4QtFMONimh2OxiZQDNawgQg3Wvcy9Asoy/p XsP8n9IvoS/Fd9S0+J5rVfcWqTUL/cWePZhfJ1QGWAS43JoSerCcrdG0CS7fL1ebPMkD 4ZIQ== X-Gm-Message-State: APjAAAVZFM69WGQwOEaaOq1WlP2mlrBSBjeU38uw0kEuYbi3dR93IeP/ LToYn/JCmt/NN9YdRSoIAZ6P6X7rzaS/okJEMRSTg4Ow X-Google-Smtp-Source: APXvYqz3NOvnMIfrcM7BO47n9bGTkQMPuk+vPvp8kESGcLJ5U68IJjZzOFjegCOUjM0+Y3BFoYVGiTxpmUzil7XA3ww= X-Received: by 2002:aca:53c2:: with SMTP id h185mr8352750oib.87.1552048099360; Fri, 08 Mar 2019 04:28:19 -0800 (PST) MIME-Version: 1.0 From: Timothy Victor Date: Fri, 8 Mar 2019 06:28:09 -0600 Message-ID: Subject: Flink 1.7.1 KafkaProducer error using Exactly Once semantic To: user Content-Type: multipart/alternative; boundary="000000000000a303980583945b93" --000000000000a303980583945b93 Content-Type: text/plain; charset="UTF-8" Yesterday I came across a weird problem when attempting to run 2 nearly identical jobs on a cluster. I was able to solve it (or rather workaround it), but am sharing here so we can consider a potential fix in Flink's KafkaProducer code. My scenario is as follows. I have a Flink program that reads from a Kafka topic, does a simple Map() operation and writes to a Kafka topic with exactly once semantic. The source and sink topics are configurable, and the Map() operation is also configurable (i.e. based on CLI arguments it can choose between a set of Map() operations). The job does not name any of its operators (this part sounds trivial, right?...but read on). I run an instance of this program (job) on a Flink standalone cluster running 1.7.1. The cluster has 12 TMs, each with 1 slot each. So basically each job will run in its own task slot/TM, and hence each job would run in its own JVM process. The job runs fine, checkpointing regularly and no errors. However, if I start another instance of the program (with different source/sink topics), then within a few seconds the first one fails, and enters recovery. The first one will eventually fail (all retries exhausted). If I try to start the failed job again, then the second job would fail within a few seconds. So basically it looked like one job was tripping over the other. This was especially odd since each job was running in essentially its own JVM process (i.e. Task Manager / Task Slot). Looking at the flink logs, I saw this error message: >> " org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. " So I looked at the transactionId - and saw that they were of the form: " transaction.id = Source: Custom Source -> Map -> Sink: Unamed-", essentially the transaction.id is set to the description of the chained operator followed by some GUID. It is not clear to me how the GUID is generated --- but essentially BOTH my jobs were using the same transaction.id! If I understand correctly, Flink uses a pool of KafkaProducers. Each KafkaProducer within the pool has a transaction.id associated with it. I think what is happening each of my jobs has its own pool of KafkaProducers. However, each producer in both pools essentially have the same ID. So like JobA.Pool: {P1, P2, P3}, JobB.Pool: {P1, P2, P3}. This sounds like it would not be a problem since each pool will live in its own JVM process. But since it does break, my conjecture is this -- with the way 2-phase commit works, in the _commit_ phase, I believe the JM sends a signal to each operator to commit its state. My guess is that since the IDs collide, the Producer in one pool is told to commit the transaction with an epoch for a producer in the other pool which happens to be less than the last epoch for it. Example P1 (in Job A) gets a message to commit with epoch 0 that is actually meant for P1 (in Job B). The only other explanation I can think of is that these pools are in fact shared between task managers -- but that's really hard to believe. Is my understanding correct? I was able to solve this by simply naming one of my operators so that the transaction.id will be unique for each job. Example, JobA transaction.id = "Source: Custom Source -> (JobA) -> Sink: unamed-guid" JobB transaction.id = "Source: Custom Source -> (JobB) -> Sink: unamed-guid" After I did this - both jobs run successfully. I think a good improvement would be to _not_ use the job graph description as the transaction ID. Maybe a simple approach is to require the user to provide a pool identifier when using Exactly Once with Kafka. At least this would make it clear. Thanks Tim --000000000000a303980583945b93 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Yesterday I came across a weird problem when attempting to= run 2 nearly identical jobs on a cluster.=C2=A0 I was able to solve it (or= rather workaround it), but am sharing here so we can consider a potential = fix in Flink's KafkaProducer code.

My scenario is as= follows.=C2=A0 I have a Flink program that reads from a Kafka topic, does = a simple Map() operation and writes to a Kafka topic with exactly once sema= ntic.=C2=A0 The source and sink topics are configurable, and the Map() oper= ation is also configurable (i.e. based on CLI arguments it can choose betwe= en a set of Map() operations).=C2=A0 The job does not name any of its opera= tors (this part sounds trivial, right?...but read on).=C2=A0 =C2=A0I run an= instance of this program (job) on a Flink standalone cluster running 1.7.1= .=C2=A0 The cluster has 12 TMs, each with 1 slot each.=C2=A0 =C2=A0So basic= ally each job will run in its own task slot/TM, and hence each job would ru= n in its own JVM process.=C2=A0 The job runs fine, checkpointing regularly = and no errors.=C2=A0 =C2=A0However, if I start another instance of the prog= ram (with different source/sink topics), then within a few seconds the firs= t one fails, and enters recovery.=C2=A0 =C2=A0The first one will eventually= fail (all retries exhausted).=C2=A0 =C2=A0If I try to start the failed job= again, then the second job would fail within a few seconds.=C2=A0 =C2=A0So= basically it looked like one job was tripping over the other.=C2=A0 =C2=A0= This was especially odd since each job was running in essentially its own J= VM process (i.e. Task Manager / Task Slot).

Lookin= g at the flink logs, I saw this error message: >> " org.apache.kafka.common.errors.ProducerFencedException: Producer attem= pted an operation with an old epoch. Either there is a newer producer with = the same transactionalId, or the producer's transaction has been expire= d by the broker.=C2=A0"

So I looked at= the transactionId - and saw that they were of the form:=C2=A0 "transaction.id =3D Source: Custom Source -&= gt; Map -> Sink: Unamed-<guid>", essentially the transaction.id is set to the description of the c= hained operator followed by some GUID.=C2=A0 =C2=A0It is not clear to me ho= w the GUID is generated --- but essentially BOTH my jobs were using the sam= e transaction.id!=C2=A0 =C2=A0
<= div>
If I understand correctly, Flink uses a pool of KafkaPro= ducers.=C2=A0 Each KafkaProducer within the pool has a transaction.id associated with it.=C2=A0 =C2=A0I think wh= at is happening each of my jobs has its own pool of KafkaProducers.=C2=A0 H= owever, each producer in both pools essentially have the same ID.=C2=A0 So = like JobA.Pool: {P1, P2, P3},=C2=A0 =C2=A0JobB.Pool: {P1, P2, P3}.=C2=A0 = =C2=A0This sounds like it would not be a problem since each pool will live = in its own JVM process.=C2=A0 But since it does break, my conjecture is thi= s -- with the way 2-phase commit works, in the _commit_ phase, I believe th= e JM sends a signal to each operator to commit its state.=C2=A0 =C2=A0My gu= ess is that since the IDs collide, the Producer in one pool is told to comm= it the transaction with an epoch for a producer in the other pool which hap= pens to be less than the last epoch for it.=C2=A0 =C2=A0Example P1 (in Job = A) gets a message to commit with epoch 0 that is actually meant for P1 (in = Job B).=C2=A0 =C2=A0The only other explanation I can think of is that these= pools are in fact shared between task managers -- but that's really ha= rd to believe.

Is my understanding correct?=C2=A0= =C2=A0

I was able to solve this by simply naming o= ne of my operators so that the transactio= n.id will be unique for each job.=C2=A0 =C2=A0Example,=C2=A0
= JobA=C2=A0 transaction.id =3D "S= ource: Custom Source -> (JobA) -> Sink: unamed-guid"
J= obB transaction.id =3D "Source: = Custom Source -> (JobB) -> Sink: unamed-guid"

After I did this - both jobs run successfully.

<= div>I think a good improvement would be to _not_ use the job graph descript= ion as the transaction ID.=C2=A0 =C2=A0Maybe a simple approach is to requir= e the user to provide a pool identifier when using Exactly Once with Kafka.= =C2=A0 At least this would make it clear.

Thanks

Tim
--000000000000a303980583945b93--