From user-return-29853-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Sep 18 12:23:41 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 48F45180634 for ; Wed, 18 Sep 2019 14:23:41 +0200 (CEST) Received: (qmail 79872 invoked by uid 500); 18 Sep 2019 12:23:39 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 79805 invoked by uid 99); 18 Sep 2019 12:23:38 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Sep 2019 12:23:38 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 63FA5C8902 for ; Wed, 18 Sep 2019 12:23:38 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.003 X-Spam-Level: ** X-Spam-Status: No, score=2.003 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=0.001, RCVD_IN_MSPIKE_WL=0.001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, URIBL_CSS=0.1, URIBL_CSS_A=0.1] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id RVdYA5rDeicv for ; Wed, 18 Sep 2019 12:23:36 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.210.67; helo=mail-ot1-f67.google.com; envelope-from=fhueske@gmail.com; receiver= Received: from mail-ot1-f67.google.com (mail-ot1-f67.google.com [209.85.210.67]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 768BCBC553 for ; Wed, 18 Sep 2019 12:23:36 +0000 (UTC) Received: by mail-ot1-f67.google.com with SMTP id y39so6122723ota.7 for ; Wed, 18 Sep 2019 05:23:36 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=jSweOA5lWMZ+mtOs+L17XNHdeXTp8HJpkdOsIj731z0=; b=TkD+SSLZuvbIaQ9rKDttlSBLhfUx9jc76clfoQLcfWZWJ76gBUiwoLPI4BktAgvaAD j6QoD3/IlSghSm+uBmDKmUWX0nJ7j6FDpd7KY858+A91ZIa7cdC1Tic0MeprXNR6ppft MLx9IvZ2dC89MpZcUUpcsJmOt6ObzoVpmLvmhUnB5tszROCz0C7GOxPoQNyJPeHG4/km 0j7cAXW/hH1zoykcXAvffjUbKPpozIH65PQDUnxmwFQbYXx4jThbcJiD0oatTCaltYGR Kk6C4LhjDBl4G2KcCQoxGGxIGd32duGEP99SAKPdFw+wcO0qgXFmoI/WJTiEcE+CH74x tNBA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=jSweOA5lWMZ+mtOs+L17XNHdeXTp8HJpkdOsIj731z0=; b=eZG0de4aCxbrs9qTO1lbMUzTUBZyoIpgK1jwK9yz1zXX4WzLI9J2DkJTRYOkgUsz+o eXONhgHm2klW1MV2Q4JFyUt0osaskBMsWnjUqprP3bRCJfqXECAs+DA557vcgKesjchh opSQaPhPra/56qamXt2h2G99EkvUvdprFSWl2CudF5nafWXbyHzlWaoDHAqLwcFJx3ae wBH3QBDVvtBDeRu5E6W32GylQrgwp4SQwGVrS0pSZF17L63LCiFSAII0GU1+ZCvjVAfm FS01R3xilpRbmmSniJXBadET/C9svF2uf6vZj2nokJ6proWcJQPPWMXN6ShoQBaxgLjY Di6g== X-Gm-Message-State: APjAAAU0ObN6hsW3ZXtQcXEOvv+LlRduwxDZXnJhEP6m3l9Uv/tzUeKy sAtC0q98EYYxpATuEF1MfcJ4PjTYbvxJS6iByAk= X-Google-Smtp-Source: APXvYqzmbmfqJ276a3siy+hOjHbZz3HJX+IM9/QZccylPy4ZFRlY/2Vobbrt4iAV9e9VTCNZXdkM22S3rAP2J7N8U1k= X-Received: by 2002:a9d:4786:: with SMTP id b6mr2459677otf.112.1568809415773; Wed, 18 Sep 2019 05:23:35 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Fabian Hueske Date: Wed, 18 Sep 2019 14:22:59 +0200 Message-ID: Subject: Re: Time Window Flink SQL join To: Nishant Gupta Cc: user Content-Type: multipart/alternative; boundary="000000000000f2920a0592d2e706" --000000000000f2920a0592d2e706 Content-Type: text/plain; charset="UTF-8" Hi, The query that you wrote is not a time-windowed join. INSERT INTO sourceKafkaMalicious SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON sourceKafka.`source.ip`=badips.ip WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP; The problem is the use of CURRENT_TIMESTAMP instead of a processing time (or event time) attribute of badips. What exactly are you trying to achieve with the query? Best, Fabian Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta < nishantgupta1010@gmail.com>: > Hi Team, > > I am running a query for Time Window Join as below > > INSERT INTO sourceKafkaMalicious > SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON > sourceKafka.`source.ip`=badips.ip > WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL > '15' MINUTE AND CURRENT_TIMESTAMP; > > Time windowed join, Flink SQL should automatically clear older records, Some > how the query does not clear the heapspace and fails with error after > sometime. > > Can you please let me know what could go wrong, or is it a issue > > Environment File chunks > > -------------------------------------------------------------------------------------------------------------------------------------------------------------- > tables: > - name: sourceKafka > type: source-table > update-mode: append > connector: > type: kafka > version: "universal" > topic: test-data-flatten > properties: > - key: zookeeper.connect > value: x.x.x.x:2181 > - key: bootstrap.servers > value: x.x.x.x:9092 > - key: group.id > value: testgroup > format: > type: json > fail-on-missing-field: false > json-schema: > > { > type: 'object', > properties: { > 'source.ip': { > type: 'string' > }, > 'source.port': { > type: 'string' > } > } > } > derive-schema: false > schema: > - name: ' source.ip ' > type: VARCHAR > - name: 'source.port' > type: VARCHAR > > - name: sourceKafkaMalicious > type: sink-table > update-mode: append > connector: > type: kafka > version: "universal" > topic: test-data-mal > properties: > - key: zookeeper.connect > value: x.x.x.x:2181 > - key: bootstrap.servers > value: x.x.x.x:9092 > - key: group.id > value: testgroupmal > format: > type: json > fail-on-missing-field: false > json-schema: > > { > type: 'object', > properties: { > 'source.ip': { > type: 'string' > }, > 'source.port': { > type: 'string' > } > } > } > derive-schema: false > schema: > - name: ' source.ip ' > type: VARCHAR > - name: 'source.port' > type: VARCHAR > > - name: badips > type: source-table > #update-mode: append > connector: > type: filesystem > path: "/home/cyanadmin/ipsum/levels/badips.csv" > format: > type: csv > fields: > - name: ip > type: VARCHAR > comment-prefix: "#" > schema: > - name: ip > type: VARCHAR > > execution: > planner: blink > type: streaming > time-characteristic: event-time > periodic-watermarks-interval: 200 > result-mode: table > max-table-result-rows: 1000000 > parallelism: 3 > max-parallelism: 128 > min-idle-state-retention: 0 > max-idle-state-retention: 0 > restart-strategy: > type: fallback > > configuration: > table.optimizer.join-reorder-enabled: true > table.exec.spill-compression.enabled: true > table.exec.spill-compression.block-size: 128kb > Properties that describe the cluster to which table programs are > submitted to. > > deployment: > response-timeout: 5000 > > > -------------------------------------------------------------------------------------------------------------------------------------------------------------- > --000000000000f2920a0592d2e706 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi,

The query that you wrote= is not a time-windowed join.

INSERT INTO sour= ceKafkaMalicious
SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips= ON sourceKafka.`source.ip`=3Dbadips.ip
WHERE sourceKafka.timestamp_rece= ived BETWEEN CURRENT_TIMESTAMP - INTERVAL '15' MINUTE AND CURRENT_T= IMESTAMP;

The problem is the use of CURRENT_TIMEST= AMP instead of a processing time (or event time) attribute of badips.
=

What exactly are you trying to achieve with the query?<= /div>

Best, Fabian

Am Mi., 18. Sept. 2019 um 14= :02=C2=A0Uhr schrieb Nishant Gupta <nishantgupta1010@gmail.com>:
Hi Team,

I am running a query for Ti= me Window Join as below

INSERT INTO sourceKafkaMalicious
SELECT s= ourceKafka.* FROM sourceKafka INNER JOIN badips ON sourceKafka.`source.ip`= =3Dbadips.ip
WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMEST= AMP - INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;

Time windowed join, Flink= SQL should automatically clear older records,=C2=A0Some how the que= ry does not clear the heapspace and fails with error after sometime.

Can you please let me know what could go wrong, or i= s it a issue

Environment File chunks
----------------------------= ---------------------------------------------------------------------------= -------------------------------------------------------
tables:
=C2= =A0 - name: sourceKafka
=C2=A0 =C2=A0 type: source-table
=C2=A0 =C2= =A0 update-mode: append
=C2=A0 =C2=A0 connector:
=C2=A0 =C2=A0 =C2=A0= type: kafka
=C2=A0 =C2=A0 =C2=A0 version: "universal"
=C2= =A0 =C2=A0 =C2=A0 topic: test-data-flatten
=C2=A0 =C2=A0 =C2=A0 properti= es:=C2=A0
=C2=A0 =C2=A0 =C2=A0 =C2=A0 - key: zookeeper.connect
=C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 value: x.x.x.x:2181
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 - key: bootstrap.servers
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 value= : x.x.x.x:9092
=C2=A0 =C2=A0 =C2=A0 =C2=A0 - key:=C2=A0group.id
=C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 value: testgroup
=C2=A0 =C2=A0 format:
=C2=A0 =C2=A0 =C2=A0 ty= pe: json
=C2=A0 =C2=A0 =C2=A0 fail-on-missing-field: false
=C2=A0 =C2= =A0 =C2=A0 json-schema: >
=C2=A0 =C2=A0 =C2=A0 =C2=A0 {
=C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 type: 'object',
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 properties: {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 &#= 39;source.ip': {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0type: 'string'
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }= ,
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 'source.port': {
= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0type: 'string= 9;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2= =A0 =C2=A0 derive-schema: false
=C2=A0 =C2=A0 schema:
=C2=A0 =C2=A0 = =C2=A0 - name: ' source.ip '
=C2=A0 =C2=A0 =C2=A0 =C2=A0 type: V= ARCHAR
=C2=A0 =C2=A0 =C2=A0 - name: 'source.port'
=C2=A0 =C2= =A0 =C2=A0 =C2=A0 type: VARCHAR

=C2=A0 - name: sourc= eKafkaMalicious
=C2=A0 =C2=A0 type: sink-table
=C2=A0 =C2=A0 update-m= ode: append
=C2=A0 =C2=A0 connector:
=C2=A0 =C2=A0 =C2=A0 type: kafka=
=C2=A0 =C2=A0 =C2=A0 version: "universal"
=C2=A0 =C2=A0 = =C2=A0 topic: test-data-mal
=C2=A0 =C2=A0 =C2=A0 properties:=C2=A0
= =C2=A0 =C2=A0 =C2=A0 =C2=A0 - key: zookeeper.connect
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 value: x.x.x.x:2181
=C2=A0 =C2=A0 =C2=A0 =C2=A0 - key:= bootstrap.servers
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 value: x.x.x.x:909= 2
=C2=A0 =C2=A0 =C2=A0 =C2=A0 - key:=C2=A0group.id
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 value: = testgroupmal
=C2=A0 =C2=A0 format:
=C2=A0 =C2=A0 =C2=A0 type: json=C2=A0 =C2=A0 =C2=A0 fail-on-missing-field: false
=C2=A0 =C2=A0 =C2=A0 = json-schema: >
=C2=A0 =C2=A0 =C2=A0 =C2=A0 {
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 type: 'object',
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= properties: {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 'source.ip&= #39;: {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0type: = 9;string'
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 },
=C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 'source.port': {
=C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0type: 'string'
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 deriv= e-schema: false
=C2=A0 =C2=A0 schema:
=C2=A0 =C2=A0 =C2=A0 - name: &#= 39; source.ip '
=C2=A0 =C2=A0 =C2=A0 =C2=A0 type: VARCHAR
=C2=A0 = =C2=A0 =C2=A0 - name: 'source.port'
=C2=A0 =C2=A0 =C2=A0 =C2=A0 = type: VARCHAR

=C2=A0 - name: badips
=C2=A0 =C2=A0 type: source-ta= ble
=C2=A0 =C2=A0 #update-mode: append
=C2=A0 =C2=A0 connector:
= =C2=A0 =C2=A0 =C2=A0 type: filesystem
=C2=A0 =C2=A0 =C2=A0 path: "/= home/cyanadmin/ipsum/levels/badips.csv"
=C2=A0 =C2=A0 format:
= =C2=A0 =C2=A0 =C2=A0 type: csv
=C2=A0 =C2=A0 =C2=A0 fields:
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 - name: ip
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 type:= VARCHAR
=C2=A0 =C2=A0 =C2=A0 comment-prefix: "#"
=C2=A0 = =C2=A0 schema:
=C2=A0 =C2=A0 =C2=A0 - name: ip
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 type: VARCHAR

execution:
=C2= =A0 planner: blink
=C2=A0 type: streaming
=C2=A0 time-characteristic:= event-time
=C2=A0 periodic-watermarks-interval: 200
=C2=A0 result-mo= de: table
=C2=A0 max-table-result-rows: 1000000
=C2=A0 parallelism: 3=
=C2=A0 max-parallelism: 128
=C2=A0 min-idle-state-retention: 0
= =C2=A0 max-idle-state-retention: 0
=C2=A0 restart-strategy:
=C2=A0 = =C2=A0 type: fallback

configuration:
=C2=A0 table.optimizer.join-= reorder-enabled: true
=C2=A0 table.exec.spill-compression.enabled: true<= br>=C2=A0 table.exec.spill-compression.block-size: 128kb
=C2=A0Propertie= s that describe the cluster to which table programs are submitted to.
deployment:
=C2=A0 response-timeout: 5000

--------------------------------------------------------------------------= ---------------------------------------------------------------------------= ---------
--000000000000f2920a0592d2e706--