Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 750CB200CC8 for ; Fri, 30 Jun 2017 07:02:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7390E160BF7; Fri, 30 Jun 2017 05:02:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 92929160BED for ; Fri, 30 Jun 2017 07:02:03 +0200 (CEST) Received: (qmail 1106 invoked by uid 500); 30 Jun 2017 05:02:02 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 1097 invoked by uid 99); 30 Jun 2017 05:02:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Jun 2017 05:02:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 5EF701AF923 for ; Fri, 30 Jun 2017 05:02:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id QFgBvXmJ0bB0 for ; Fri, 30 Jun 2017 05:02:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id CF7FD5F36B for ; Fri, 30 Jun 2017 05:02:00 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 53C71E0026 for ; Fri, 30 Jun 2017 05:02:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 0F256245B9 for ; Fri, 30 Jun 2017 05:02:00 +0000 (UTC) Date: Fri, 30 Jun 2017 05:02:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 30 Jun 2017 05:02:04 -0000 [ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069483#comment-16069483 ] ASF GitHub Bot commented on FLINK-6352: --------------------------------------- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3915 @zjureel there seems to be a failure in the Kafka tests caused by this PR, could you have a look? >Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 2.875 sec <<< FAILURE! - in org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase Time elapsed: 2.874 sec <<< FAILURE! java.lang.AssertionError: Test setup failed: null at org.junit.Assert.fail(Assert.java:88) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.prepare(KafkaTestEnvironmentImpl.java:226) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.prepare(KafkaTestEnvironment.java:45) at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.startClusters(KafkaTestBase.java:138) at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.prepare(KafkaTestBase.java:98) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) Results : Failed tests: Kafka010ITCase>KafkaTestBase.prepare:98->KafkaTestBase.startClusters:138 Test setup failed: null Kafka010ProducerITCase>KafkaTestBase.prepare:98->KafkaTestBase.startClusters:138 Test setup failed: null > FlinkKafkaConsumer should support to use timestamp to set up start offset > ------------------------------------------------------------------------- > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Reporter: Fang Yong > Assignee: Fang Yong > Fix For: 1.4.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of FlinkKafkaConsumer, and the value should be earliest/latest/none. This method can only let the job comsume the beginning or the most recent data, but can not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as "flink.source.start.time" and the format is "yyyy-MM-dd HH:mm:ss") that allows user to configure the initial offset of Kafka. The action of "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this partition is newly increased), the "flink.kafka.start.time" will be used to initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)