From issues-return-149797-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Jan 26 18:54:09 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 7473D180657 for ; Fri, 26 Jan 2018 18:54:09 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 63E53160C56; Fri, 26 Jan 2018 17:54:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B51F9160C51 for ; Fri, 26 Jan 2018 18:54:08 +0100 (CET) Received: (qmail 50549 invoked by uid 500); 26 Jan 2018 17:54:07 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 50355 invoked by uid 99); 26 Jan 2018 17:54:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Jan 2018 17:54:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D286BF3518; Fri, 26 Jan 2018 17:54:05 +0000 (UTC) From: fhueske To: issues@flink.apache.org Reply-To: issues@flink.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ... Content-Type: text/plain Message-Id: <20180126175405.D286BF3518@git1-us-west.apache.org> Date: Fri, 26 Jan 2018 17:54:05 +0000 (UTC) Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5240#discussion_r164166552 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy} +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} + +import scala.collection.mutable + +/** + * Rowtime descriptor for describing an event time attribute in the schema. + */ +class Rowtime extends Descriptor { + + private var timestampExtractor: Option[TimestampExtractor] = None + private var watermarkStrategy: Option[WatermarkStrategy] = None + + /** + * Sets a built-in timestamp extractor that converts an existing [[Long]] or + * [[Types.SQL_TIMESTAMP]] field into the rowtime attribute. + * + * @param fieldName The field to convert into a rowtime attribute. + */ + def timestampFromField(fieldName: String): Rowtime = { + timestampExtractor = Some(new ExistingField(fieldName)) + this + } + + /** + * Sets a built-in timestamp extractor that converts the assigned timestamp from + * a DataStream API record into the rowtime attribute. + * + * Note: This extractor only works in streaming environments. + */ + def timestampFromDataStream(): Rowtime = { + timestampExtractor = Some(new StreamRecordTimestamp) + this + } + + /** + * Sets a custom timestamp extractor to be used for the rowtime attribute. + * + * @param extractor The [[TimestampExtractor]] to extract the rowtime attribute + * from the physical type. + */ + def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = { + timestampExtractor = Some(extractor) + this + } + + /** + * Sets a built-in watermark strategy for ascending rowtime attributes. + * + * Emits a watermark of the maximum observed timestamp so far minus 1. + * Rows that have a timestamp equal to the max timestamp are not late. + */ + def watermarkPeriodicAscending(): Rowtime = { --- End diff -- `periodicAscendingWatermarks()`? ---