From user-return-30045-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Sep 26 12:26:57 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 4D6AF1804BB for ; Thu, 26 Sep 2019 14:26:57 +0200 (CEST) Received: (qmail 98769 invoked by uid 500); 26 Sep 2019 12:26:55 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 98759 invoked by uid 99); 26 Sep 2019 12:26:55 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Sep 2019 12:26:55 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 366D9C2D6C for ; Thu, 26 Sep 2019 12:26:55 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3 X-Spam-Level: *** X-Spam-Status: No, score=3 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, FREEMAIL_REPLY=1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, URIBL_CSS=0.1, URIBL_CSS_A=0.1] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id Ly0V51Xs41JJ for ; Thu, 26 Sep 2019 12:26:53 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.167.195; helo=mail-oi1-f195.google.com; envelope-from=zjuwangg@gmail.com; receiver= Received: from mail-oi1-f195.google.com (mail-oi1-f195.google.com [209.85.167.195]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 166ABBC89E for ; Thu, 26 Sep 2019 12:20:01 +0000 (UTC) Received: by mail-oi1-f195.google.com with SMTP id t84so1857194oih.10 for ; Thu, 26 Sep 2019 05:20:01 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:message-id:mime-version:subject:date:in-reply-to:cc:to :references; bh=Yp6c+igmuGOIRo+h8LkzOcBjufG69QO3wo10JSZLA+I=; b=rA/rLcLSE6Q+B6J+lm2bSSJG4CLUY3TYtLejBCwflSNIdJHfQRTV5TavF39jpK7Wbx I9py2BwYVI7UcAeG2BcJrD/icmzTxjQApG3EuBU7VJkmwuhrJ2DltOY9r6YgFwZIb81i vXYIhp+RpEGGJY3FBX8C9yyrDdaYj4D/1pmTn+NBmRqEB4Q2AwaY8do9TEjwvDxLH/FF T+fU4yDnSomSgLEMyj2twI58jfeHrRU3DkHMWQPpkioPIk4z6ORCywRQdewc8ufbQQIC dL/w/tAqJirqxM8XkfNDPbwMyOJiFRlXIKVWxEg0G4t2/gbSigMoB9v1+VA3TWAtsEgi t5qg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:message-id:mime-version:subject:date :in-reply-to:cc:to:references; bh=Yp6c+igmuGOIRo+h8LkzOcBjufG69QO3wo10JSZLA+I=; b=LLNjMg7ljWd+Fw2O/+ycEPBqpw65bYIw4SEguhWAglVrwQWtyApkZq+l0I/guI1jD8 +i+o3hwaPWhkWbBLELIV2JzzkS1WlwirnwCpzOSrYtApZzOs/GGLEhDucfSkHSGQPBlK wAITD/oFXOEepC1jLCaRpIXqS5ZuF4iExl0jRRTeSDdtUlttP154XjGRLTdEStqdhp7n D0BO7/udq3qiUo5seu2RHGrYRgwRy2FOy4aiX/iX3V3iCyh7LqpDIxb7pt9VVgVSYrHN Tbpix6uu3LxSUxKVbVW9z04vMUb83bHmqPCWQvPuAFdfNheG1VwAiUI0tT1A8/VjMnnT cLRA== X-Gm-Message-State: APjAAAX7bnr471QFEMnzq+VB8YGlzqhO14LQPB4uTpgrOmbeFoJL1GDZ zBFJcUBadqijgTWiJFlr5a8= X-Google-Smtp-Source: APXvYqyZXFDbC381V9RNFUpyUuASQOl20Jkeh5KnpzFY/nlL1Fb6MItFFcfD56g1B76XRUgOD1kUiA== X-Received: by 2002:aca:3908:: with SMTP id g8mr2486774oia.54.1569500400478; Thu, 26 Sep 2019 05:20:00 -0700 (PDT) Received: from [30.5.17.200] ([205.204.117.9]) by smtp.gmail.com with ESMTPSA id k204sm622967oif.33.2019.09.26.05.19.58 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Thu, 26 Sep 2019 05:19:59 -0700 (PDT) From: Terry Wang Message-Id: Content-Type: multipart/alternative; boundary="Apple-Mail=_5BCC7C26-47FF-4902-9711-BA7FA5C2D571" Mime-Version: 1.0 (Mac OS X Mail 12.4 \(3445.104.11\)) Subject: Re: Flink SQL update-mode set to retract in env file. Date: Thu, 26 Sep 2019 20:19:56 +0800 In-Reply-To: Cc: user To: srikanth flink References: X-Mailer: Apple Mail (2.3445.104.11) --Apple-Mail=_5BCC7C26-47FF-4902-9711-BA7FA5C2D571 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=gb2312 Hi, Srikanth~ In your code,=20 DataStream outStreamAgg =3D = tableEnv.toRetractStream(resultTable, Row.class).map(t -> {}); has = converted the resultTable into a DataStream that=A1=AFs unrelated with = tableApi, And the following code `outStreamAgg.addSink(=A1=AD)` is just a normall = stream write to a FlinkKafka sink function. Your program code is a mixture of table api and dataStream programing = not just single Table API. Best, Terry Wang > =D4=DA 2019=C4=EA9=D4=C226=C8=D5=A3=AC=CF=C2=CE=E75:47=A3=ACsrikanth = flink =D0=B4=B5=C0=A3=BA >=20 > Hi Terry Wang, >=20 > Thanks for quick reply. >=20 > I would like to understand more on your line " If the target table is = a type of Kafka which implments AppendStreamTableSink, the update-mode = will be append only".=20 > If your statement defines retract mode could not be used for Kafka = sinks as it implements AppendStreamTableSink, but then the below code is = working for me, dumping data to Kafka: > DataStream outStreamAgg =3D = tableEnv.toRetractStream(resultTable, Row.class).map(t -> { > Row r =3D t.f1; > ObjectNode node =3D mapper.createObjectNode(); > node.put("source.ip", r.getField(0).toString()); > node.put("destination.ip", r.getField(1).toString()); > node.put("cnt", Long.parseLong(r.getField(2).toString())); > return node.toString(); > });=20 > Properties kafkaProducerProperties =3D new Properties(); > = kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFI= G, "host:9092"); > kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1"); > kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, = "3"); >=20 > outStreamAgg.addSink(new = FlinkKafkaProducer("reconMultiAttempFail", new = SimpleStringSchema(), > kafkaProducerProperties)); >=20 > Is it that the above functionality works only with Table API and not = with SQL? > Please explain. >=20 > Thanks > Srikanth >=20 >=20 >=20 > On Thu, Sep 26, 2019 at 1:57 PM Terry Wang > wrote: > Hi srikanth~ >=20 > The Flink SQL update-mode is inferred from the target table type. > For now, there are three StreamTableSink type, `AppendStreamTableSink` = `UpsertStreamTableSink` and `RetractStreamTableSink`.=20 > If the target table is a type of Kafka which implments = AppendStreamTableSink, the update-mode will be append only.=20 > So if you want enable retract-mode you may need to insert into one = kind of RetractStreamTableSink. > Hope it helps you ~ >=20 >=20 >=20 > Best, > Terry Wang >=20 >=20 >=20 >> =D4=DA 2019=C4=EA9=D4=C226=C8=D5=A3=AC=CF=C2=CE=E72:50=A3=ACsrikanth = flink > =D0=B4=B5=C0=A3= =BA >>=20 >> How could I configure environment file for Flink SQL, update-mode: = retract? >>=20 >> I have this for append: >> properties: =20 >> - key: zookeeper.connect >> value: localhost:2181 >> - key: bootstrap.servers >> value: localhost:9092 >> - key: group.id >> value: reconMultiAttempFail >> format: >> type: json >> fail-on-missing-field: false >> json-schema: > >> { >> type: 'object', >> properties: { >> 'a': { >> type: 'string' >> }, >> 'b': { >> type: 'string' >> }, >> 'cnt': { >> type: 'string' >> } >> } >> } >> derive-schema: false >>=20 >> schema: >> - name: 'a' >> type: VARCHAR >> - name: 'b' >> type: VARCHAR >> - name: 'cnt' >> type: BIGINT >>=20 >> Couldn't find any document for the same.=20 >>=20 >> someone help me with the syntax. >>=20 >> Thanks >> Srikanth >>=20 >=20 --Apple-Mail=_5BCC7C26-47FF-4902-9711-BA7FA5C2D571 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=gb2312 Hi, = Srikanth~

In your = code, 
DataStream<String> outStreamAgg =3D= tableEnv.toRetractStream(resultTable, Row.class).map(t -> {}); =  has converted the resultTable into a DataStream that=A1=AFs = unrelated with tableApi,
And the following code = `outStreamAgg.addSink(=A1=AD)` is just a normall stream write to a = FlinkKafka sink function.
Your program code is a = mixture of table api and dataStream programing not just single Table = API.

Best,
Terry Wang



=D4=DA 2019=C4=EA9=D4=C226=C8=D5=A3=AC=CF=C2=CE=E75:47=A3=ACsri= kanth flink <flink.devv@gmail.com> =D0=B4=B5=C0=A3=BA

Hi Terry Wang,

Thanks for quick reply.

I would like to = understand more on your line " If the target table is a type of Kafka which implments = AppendStreamTableSink, the update-mode will be append only".
If your statement defines retract mode = could not be used for Kafka sinks as it implements = AppendStreamTableSink, but then the below code is working for me, = dumping data to Kafka:
DataStream<String> outStreamAgg =3D = tableEnv.toRetractStream(resultTable, Row.class).map(t -> {
Row r =3D t.f1;
= ObjectNode node =3D mapper.createObjectNode();
= node.put("source.ip", r.getField(0).toString());
= node.put("destination.ip", r.getField(1).toString());
= node.put("cnt", Long.parseLong(r.getField(2).toString()));
return node.toString();
= });
Properties kafkaProducerProperties =3D new Properties();
= kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFI= G, "host:9092");
= kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
= kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, = "3");

outStreamAgg.addSink(new = FlinkKafkaProducer<String>("reconMultiAttempFail", new = SimpleStringSchema(),
= kafkaProducerProperties));

Is it that the above functionality works only with Table API = and not with SQL?
Please explain.

Thanks
Srikanth



On Thu, Sep = 26, 2019 at 1:57 PM Terry Wang <zjuwangg@gmail.com> wrote:
Hi srikanth~

The Flink SQL update-mode is inferred = from the target table type.
For now, there are = three StreamTableSink type, `AppendStreamTableSink` = `UpsertStreamTableSink` and `RetractStreamTableSink`. 
If the target table is a type of Kafka which implments = AppendStreamTableSink, the update-mode will be append = only. 
So if you want enable retract-mode you = may need to insert into one kind of RetractStreamTableSink.
Hope it helps you ~



Best,
Terry Wang



=D4=DA 2019=C4=EA9=D4=C226=C8=D5=A3=AC=CF=C2=CE=E72:50=A3=ACsri= kanth flink <flink.devv@gmail.com> =D0=B4=B5=C0=A3=BA

How could I = configure environment file for Flink SQL, update-mode: = retract?

I = have this for append:
properties:        
        - key: zookeeper.connect
          value: localhost:2181
        - key: bootstrap.servers
          value: localhost:9092
        - key: group.id
    =       value: reconMultiAttempFail
  =   format:
      type: json
      fail-on-missing-field: false
      json-schema: >
  =       {
          = type: 'object',
          = properties: {
      =       'a': {
  =              type: 'string'
            },
            'b': {
               type: = 'string'
            },
            'cnt': {
               type: = 'string'
            }
          }
  =       }
      derive-schema: = false

    schema:
      - name: 'a'
    =     type: VARCHAR
     = - name: 'b'
        type: VARCHAR
      - name: 'cnt'
  =       type: BIGINT

Couldn't find any document for the = same.

someone help me with the syntax.

Thanks
Srikanth



= --Apple-Mail=_5BCC7C26-47FF-4902-9711-BA7FA5C2D571--