flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [1/2] flink git commit: [FLINK-5625] [kinesis] Configurable date format for timestamp-based start position in FlinkKinesisConsumer
Date Fri, 31 Mar 2017 04:38:30 GMT
Repository: flink
Updated Branches:
  refs/heads/master ee033c903 -> 193224017


[FLINK-5625] [kinesis] Configurable date format for timestamp-based start position in FlinkKinesisConsumer

This closes #3651.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a119a30d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a119a30d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a119a30d

Branch: refs/heads/master
Commit: a119a30da91bc88b6b0242622adff2189b0a8fa4
Parents: ee033c9
Author: Tony Wei <tony19920430@gmail.com>
Authored: Thu Mar 30 09:48:43 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Fri Mar 31 12:32:19 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md                  |  8 ++-
 .../kinesis/config/ConsumerConfigConstants.java |  7 +-
 .../kinesis/internals/ShardConsumer.java        | 12 +++-
 .../kinesis/util/KinesisConfigUtil.java         | 23 +++---
 .../kinesis/FlinkKinesisConsumerTest.java       | 75 ++++++++++++++++----
 5 files changed, 95 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 59f3d61..ef1afca 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -121,9 +121,11 @@ one of the following values in the provided configuration properties
(the naming
 - `LATEST`: read all shards of all streams starting from the latest record.
 - `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible
(data may be trimmed by Kinesis depending on the retention settings).
 - `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The
timestamp must also be specified in the configuration
-properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, either
in the date pattern
-`yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`), or a non-negative
double value representing the number of seconds
-that has elapsed since the Unix epoch (for example, `1459799926.480`).
+properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in
one of the following date pattern :
+    - a non-negative double value representing the number of seconds that has elapsed since
the Unix epoch (for example, `1459799926.480`).
+    - a user defined pattern, which is a valid pattern for `SimpleDateFormat` provided by
`ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`.
+    If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default
pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`
+    (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user
or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern).
 
 #### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 4ffe0ad..7c31af4 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -56,9 +56,12 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 	/** The initial position to start reading Kinesis streams from (LATEST is used if not set)
*/
 	public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
 
-	/** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set
for STREAM_INITIAL_POSITION */
+	/** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set
for STREAM_INITIAL_POSITION) */
 	public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp";
 
+	/** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP
is set for STREAM_INITIAL_POSITION) */
+	public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format";
+
 	/** The base backoff time between each describeStream attempt */
 	public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
 
@@ -107,6 +110,8 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 
 	public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString();
 
+	public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
+
 	public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L;
 
 	public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;

http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index f6c53ce..ca85854 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -30,7 +30,6 @@ import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
-import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +37,7 @@ import java.io.IOException;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
 import java.util.Properties;
@@ -115,9 +115,15 @@ public class ShardConsumer<T> implements Runnable {
 
 		if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
{
 			String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
+
 			try {
-				this.initTimestamp = KinesisConfigUtil.initTimestampDateFormat.parse(timestamp);
-			} catch (ParseException e) {
+				String format = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
+					ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
+				SimpleDateFormat customDateFormat = new SimpleDateFormat(format);
+				this.initTimestamp = customDateFormat.parse(timestamp);
+			} catch (IllegalArgumentException | NullPointerException exception) {
+				throw new IllegalArgumentException(exception);
+			} catch (ParseException exception) {
 				this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000));
 			}
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 59b8529..244f5a5 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -37,8 +37,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * Utilities for Flink Kinesis connector configuration.
  */
 public class KinesisConfigUtil {
-	public static SimpleDateFormat initTimestampDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
-
 	/**
 	 * Validate configuration properties for {@link FlinkKinesisConsumer}.
 	 */
@@ -67,7 +65,9 @@ public class KinesisConfigUtil {
 					throw new IllegalArgumentException("Please set value for initial timestamp ('"
 						+ ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial
position.");
 				}
-				validateOptionalDateProperty(config, ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
+				validateOptionalDateProperty(config,
+					ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
+					config.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT),
 					"Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.
"
 						+ "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value.
For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 .");
 			}
@@ -222,17 +222,20 @@ public class KinesisConfigUtil {
 		}
 	}
 
-	private static void validateOptionalDateProperty(Properties config, String key, String message)
{
-		if (config.containsKey(key)) {
+	private static void validateOptionalDateProperty(Properties config, String timestampKey,
String format, String message) {
+		if (config.containsKey(timestampKey)) {
 			try {
-				initTimestampDateFormat.parse(config.getProperty(key));
-			} catch (ParseException parseException) {
+				SimpleDateFormat customDateFormat = new SimpleDateFormat(format);
+				customDateFormat.parse(config.getProperty(timestampKey));
+			} catch (IllegalArgumentException | NullPointerException exception) {
+				throw new IllegalArgumentException(message);
+			} catch (ParseException exception) {
 				try {
-					double value = Double.parseDouble(config.getProperty(key));
+					double value = Double.parseDouble(config.getProperty(timestampKey));
 					if (value < 0) {
-						throw new NumberFormatException();
+						throw new IllegalArgumentException(message);
 					}
-				} catch (NumberFormatException numberFormatException){
+				} catch (NumberFormatException numberFormatException) {
 					throw new IllegalArgumentException(message);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 45eb1bd..741f0ca 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -40,7 +40,7 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -183,7 +183,7 @@ public class FlinkKinesisConsumerTest {
 	}
 
 	@Test
-	public void testDateStringInValidateOptionDatePropertyForInitialTimestampInConfig() {
+	public void testDateStringForValidateOptionDateProperty() {
 		String timestamp = "2016-04-04T19:58:46.480-00:00";
 
 		Properties testConfig = new Properties();
@@ -194,18 +194,16 @@ public class FlinkKinesisConsumerTest {
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timestamp);
 
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-
 		try {
-			KinesisConfigUtil.initTimestampDateFormat.parse(timestamp);
-		} catch (ParseException e){
+			KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail();
 		}
 	}
 
 	@Test
-	public void testUnixTimestampInValidateOptionDatePropertyForInitialTimestampInConfig() {
+	public void testUnixTimestampForValidateOptionDateProperty() {
 		String unixTimestamp = "1459799926.480";
 
 		Properties testConfig = new Properties();
@@ -216,14 +214,65 @@ public class FlinkKinesisConsumerTest {
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
 
+		try {
+			KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail();
+		}
+	}
+
+	@Test
+	public void testInvalidPatternForInitialTimestampInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial
position in stream.");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2016-03-14");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "InvalidPattern");
+
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
 
-		try{
-			double value = Double.parseDouble(unixTimestamp);
-			if (value < 0) {
-				throw new NumberFormatException();
-			}
-		} catch (Exception e){
+	@Test
+	public void testUnparsableDateForUserDefinedDateFormatForInitialTimestampInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial
position in stream.");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "stillUnparsable");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "yyyy-MM-dd");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testDateStringForUserDefinedDateFormatForValidateOptionDateProperty() {
+		String unixTimestamp = "2016-04-04";
+		String pattern = "yyyy-MM-dd";
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, pattern);
+
+		try {
+			KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail();
 		}


Mime
View raw message