From user-return-18353-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Feb 22 12:06:00 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 6118D18064E for ; Thu, 22 Feb 2018 12:06:00 +0100 (CET) Received: (qmail 34657 invoked by uid 500); 22 Feb 2018 11:05:54 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 34644 invoked by uid 99); 22 Feb 2018 11:05:54 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Feb 2018 11:05:54 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id AF92DC00D7 for ; Thu, 22 Feb 2018 11:05:53 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.88 X-Spam-Level: ** X-Spam-Status: No, score=2.88 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_FONT_LOW_CONTRAST=0.001, HTML_MESSAGE=2, MANY_SPAN_IN_TEXT=1, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id axYpleivlwEr for ; Thu, 22 Feb 2018 11:05:48 +0000 (UTC) Received: from mail-vk0-f51.google.com (mail-vk0-f51.google.com [209.85.213.51]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 86B7D5F24E for ; Thu, 22 Feb 2018 11:05:48 +0000 (UTC) Received: by mail-vk0-f51.google.com with SMTP id u200so2840554vke.4 for ; Thu, 22 Feb 2018 03:05:48 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:from:date:message-id:subject:to; bh=ougru7zlD5Ei77MQm6ShbIE3JnewSrRD6uzKVNPcf4M=; b=acevE753/Zq+Hjg9ssVaYRzL7jj1reURH3NpHj3yE5yBLjROTOfkhbUhgwyIW+sC+z E6vBNGbdfPGoIIk3mqfAFSpxGqum1kNY6ZCpDs2B66/bOq/yRTrrLJXc8ZY8YFxdMZs2 zxuTvnSJSL891tTDQ+TDt6mqqzRUiyvF5BRjYvLvQ/D+Ee192KDAIq52pzGlbhijcDEz Hcr6I/L59GJkVHZnmJbuVLp+1zJus4wtWDJljhMzVzWftqyjpmK2MtAaCJU/RaTq6GL1 oZpzfbfv6oezIYhPo+fpRM4AB3uoToYD0K1mCyi8//BZ8EDT3PtZotVvcOqqVdMN79OX 2/PA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=ougru7zlD5Ei77MQm6ShbIE3JnewSrRD6uzKVNPcf4M=; b=TpxwO+pC/CGJNjd6lPR4V7ad8jdz6oLBsRSVuNHNIGML9DQRv/g+/IIDmQjEqXv3Cw FhE7v5fCmOoChhj5CC4YZNRDBViNdy+gupJGAr+X0fAgUuyDv1cXumULt1kKyrMs5DEQ fB1+QjvATjgxER47Aro83OceiZtIktniSwFmunqwuiatEPibZ4obRIbejW+C2vXjWvQ1 o639vDZqD1vdgLfTFTjBgORHYJ/MDVadI5VtDNna3TMZb+R1X7ek4d9HjLxyergb32qk QOv49RK7UGVo0pdzjWFJkHbzuU+hWbym1b1wFIqvQWzJBhEAcvzJg08wGX/p52aMWcGJ g6ZA== X-Gm-Message-State: APf1xPA1PYOreHgQHUIdG7wGfpI4ebg436xLRcC7FZfiGhKb4oEBpQCY ZMl/KZbnRVMeUUm/BvD6OCIu0qKvfM9f+u7lhatNGHHv X-Google-Smtp-Source: AH8x225At3YrwZzHyj0p5CmdepAddNgXH6hcdy3oiWMtxG/KvnAvoYpK12AO3gJrVjEmScYmOIDyicukNDGJrCECYo0= X-Received: by 10.31.140.5 with SMTP id o5mr4711730vkd.157.1519297542263; Thu, 22 Feb 2018 03:05:42 -0800 (PST) MIME-Version: 1.0 Received: by 10.103.29.197 with HTTP; Thu, 22 Feb 2018 03:05:41 -0800 (PST) From: "Federico D'Ambrosio" Date: Thu, 22 Feb 2018 12:05:41 +0100 Message-ID: Subject: Timestamp from Kafka record and watermark generation To: user Content-Type: multipart/alternative; boundary="001a11425ec45097bf0565cb0656" --001a11425ec45097bf0565cb0656 Content-Type: text/plain; charset="UTF-8" Hello everyone, I'm consuming from a Kafka topic, on which I'm writing with a FlinkKafkaProducer, with the timestamp relative flag set to true. From what I gather from the documentation [1], Flink is aware of Kafka Record's timestamp and only the watermark should be set with an appropriate TimestampExtractor, still I'm failing to understand how to implement it in the right way. I thought that it would be possible to use the already existent AscendingTimestampExtractor, overriding the extractTimestamp method, but it's marked final. new FlinkKafkaConsumer010[Event](ingestion_topic, new JSONDeserializationSchema(), consumerConfig) .setStartFromLatest() .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Event]() { def extractAscendingTimestamp(element: Event): Long = ??? }) Should I need to implement my own TimestampExtractor (with the appropriate getCurrentWatermark and extractTimestamp methods) ? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010 Thank you, Federico --001a11425ec45097bf0565cb0656 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hello everyone,

I= 9;m consuming from a Kafka topic, on which I'm writing with a FlinkKafk= aProducer, with the timestamp relative flag set to true.

From = what I gather from the documentation [1], Flink is aware of Kafka Record= 9;s timestamp and only the watermark should be set with an appropriate Time= stampExtractor, still I'm failing to understand how to implement it in = the right way.

I thought that it would be possible to use= the already existent=20 AscendingTimestampExtractor, overriding the extractTimestamp method, but it= 's marked final.
new FlinkKafkaConsumer010[Event](ingestion_topic=
, new JSONDeserializationSchema()<=
span style=3D"color:rgb(232,226,183)">, consumerConfig)
.setStartFromLatest= ()
.a= ssignTimestampsAndWatermarks(= new Ascendin= gTimestampExtractor[Event]() {
<= span style=3D"color:rgb(231,132,162);font-weight:bold">def extractAscendingTimestamp(element: Event): Long =3D ???
= })
Should I need to implement my own TimestampExtr= actor (with the appropriate getCurrentWatermark and extractTimestamp method= s) ?

[1] https://ci.apache.org/projects/flink/flink-docs-relea= se-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-tim= e-in-kafka-010

Thank you,
Federico
=

--001a11425ec45097bf0565cb0656--