flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [2/2] flink git commit: [FLINK-4523] Allow Kinesis Consumer to start from specific timestamp / Date
Date Tue, 24 Jan 2017 07:22:50 GMT
[FLINK-4523] Allow Kinesis Consumer to start from specific timestamp / Date


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d8a5abf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d8a5abf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d8a5abf

Branch: refs/heads/master
Commit: 8d8a5abfcc4d2452a3ae46f18a3223b66588c191
Parents: a8e85a2
Author: Tony Wei <tony19920430@gmail.com>
Authored: Thu Dec 1 11:40:46 2016 +0800
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Tue Jan 24 14:20:07 2017 +0800

----------------------------------------------------------------------
 .../kinesis/config/ConsumerConfigConstants.java |  8 ++++-
 .../kinesis/internals/ShardConsumer.java        | 18 ++++++++++
 .../kinesis/model/SentinelSequenceNumber.java   |  4 +++
 .../connectors/kinesis/proxy/KinesisProxy.java  | 36 ++++++++++++++++++--
 .../kinesis/proxy/KinesisProxyInterface.java    |  8 +++--
 .../kinesis/util/KinesisConfigUtil.java         | 30 +++++++++++++++-
 .../kinesis/FlinkKinesisConsumerTest.java       | 32 +++++++++++++++++
 .../testutils/FakeKinesisBehavioursFactory.java |  8 ++---
 8 files changed, 132 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 76c20ed..4ffe0ad 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -37,7 +37,10 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 		TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM),
 
 		/** Start reading from the latest incoming record */
-		LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM);
+		LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM),
+
+		/** Start reading from the record at the specified timestamp */
+		AT_TIMESTAMP(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM);
 
 		private SentinelSequenceNumber sentinelSequenceNumber;
 
@@ -53,6 +56,9 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 	/** The initial position to start reading Kinesis streams from (LATEST is used if not set)
*/
 	public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
 
+	/** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set
for STREAM_INITIAL_POSITION */
+	public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp";
+
 	/** The base backoff time between each describeStream attempt */
 	public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 612a4a7..f6c53ce 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -30,12 +30,15 @@ import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.text.ParseException;
+import java.util.Date;
 import java.util.List;
 import java.util.Properties;
 
@@ -64,6 +67,8 @@ public class ShardConsumer<T> implements Runnable {
 
 	private SequenceNumber lastSequenceNum;
 
+	private Date initTimestamp;
+
 	/**
 	 * Creates a shard consumer.
 	 *
@@ -107,6 +112,17 @@ public class ShardConsumer<T> implements Runnable {
 		this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty(
 			ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
 			Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
+
+		if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
{
+			String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
+			try {
+				this.initTimestamp = KinesisConfigUtil.initTimestampDateFormat.parse(timestamp);
+			} catch (ParseException e) {
+				this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000));
+			}
+		} else {
+			this.initTimestamp = null;
+		}
 	}
 
 	@SuppressWarnings("unchecked")
@@ -128,6 +144,8 @@ public class ShardConsumer<T> implements Runnable {
 				nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(),
null);
 			} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
{
 				nextShardItr = null;
+			} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
{
+				nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(),
initTimestamp);
 			} else {
 				// we will be starting from an actual sequence number (due to restore from failure).
 				// if the last sequence number refers to an aggregated record, we need to clean up any
dangling sub-records

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
index 8182201..7f9dbbb 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
@@ -35,6 +35,10 @@ public enum SentinelSequenceNumber {
 	 * start to be read from the earliest records that haven't expired yet */
 	SENTINEL_EARLIEST_SEQUENCE_NUM( new SequenceNumber("EARLIEST_SEQUENCE_NUM") ),
 
+	/** Flag value for shard's sequence numbers to indicate that the shard should
+	 * start to be read from the specified timestamp */
+	SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM( new SequenceNumber("AT_TIMESTAMP_SEQUENCE_NUM") ),
+
 	/** Flag value to indicate that we have already read the last record of this shard
 	 * (Note: Kinesis shards that have been closed due to a split or merge will have an ending
data record) */
 	SENTINEL_SHARD_ENDING_SEQUENCE_NUM( new SequenceNumber("SHARD_ENDING_SEQUENCE_NUM") );

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 0b0fccf..580555f 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -29,6 +29,8 @@ import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededExcepti
 import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
 import com.amazonaws.services.kinesis.model.StreamStatus;
 import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
@@ -42,6 +44,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.Map;
 import java.util.Random;
+import java.util.Date;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -234,14 +237,41 @@ public class KinesisProxy implements KinesisProxyInterface {
 	 * {@inheritDoc}
 	 */
 	@Override
-	public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable
String startingSeqNum) throws InterruptedException {
+	public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable
Object startingMarker) throws InterruptedException {
+		GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest()
+			.withStreamName(shard.getStreamName())
+			.withShardId(shard.getShard().getShardId())
+			.withShardIteratorType(shardIteratorType);
+
+		switch (ShardIteratorType.fromValue(shardIteratorType)) {
+			case TRIM_HORIZON:
+			case LATEST:
+				break;
+			case AT_TIMESTAMP:
+				if (startingMarker instanceof Date) {
+					getShardIteratorRequest.setTimestamp((Date) startingMarker);
+				} else {
+					throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest()
when ShardIteratorType is AT_TIMESTAMP. Must be a Date object.");
+				}
+				break;
+			case AT_SEQUENCE_NUMBER:
+			case AFTER_SEQUENCE_NUMBER:
+				if (startingMarker instanceof String) {
+					getShardIteratorRequest.setStartingSequenceNumber((String) startingMarker);
+				} else {
+					throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest()
when ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. Must be a String.");
+				}
+		}
+		return getShardIterator(getShardIteratorRequest);
+	}
+
+	private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest) throws
InterruptedException {
 		GetShardIteratorResult getShardIteratorResult = null;
 
 		int attempt = 0;
 		while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null)
{
 			try {
-				getShardIteratorResult =
-					kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(),
shardIteratorType, startingSeqNum);
+					getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
 			} catch (AmazonServiceException ex) {
 				if (isRecoverableException(ex)) {
 					long backoffMillis = fullJitterBackoff(

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
index 39ddc52..9f6d594 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
@@ -34,14 +34,16 @@ public interface KinesisProxyInterface {
 	 *
 	 * @param shard the shard to get the iterator
 	 * @param shardIteratorType the iterator type, defining how the shard is to be iterated
-	 *                          (one of: TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)
-	 * @param startingSeqNum sequence number, must be null if shardIteratorType is TRIM_HORIZON
or LATEST
+	 *                          (one of: TRIM_HORIZON, LATEST, AT_TIMESTAMP, AT_SEQUENCE_NUMBER,
AFTER_SEQUENCE_NUMBER)
+	 * @param startingMarker should be {@code null} if shardIteratorType is TRIM_HORIZON or
LATEST,
+	 *                       should be a {@code Date} value if shardIteratorType is AT_TIMESTAMP,
+	 *                       should be a {@code String} representing the sequence number if
shardIteratorType is AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER
 	 * @return shard iterator which can be used to read data from Kinesis
 	 * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains
that the
 	 *                              operation has exceeded the rate limit; this exception will
be thrown
 	 *                              if the backoff is interrupted.
 	 */
-	String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum)
throws InterruptedException;
+	String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker)
throws InterruptedException;
 
 	/**
 	 * Get the next batch of data records using a specific shard iterator

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index d8ea0a2..eb29d78 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -26,6 +26,8 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
 import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.Properties;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -35,6 +37,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * Utilities for Flink Kinesis connector configuration.
  */
 public class KinesisConfigUtil {
+	public static SimpleDateFormat initTimestampDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
 
 	/**
 	 * Validate configuration properties for {@link FlinkKinesisConsumer}.
@@ -47,7 +50,7 @@ public class KinesisConfigUtil {
 		if (config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_POSITION)) {
 			String initPosType = config.getProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION);
 
-			// specified initial position in stream must be either LATEST or TRIM_HORIZON
+			// specified initial position in stream must be either LATEST, TRIM_HORIZON or AT_TIMESTAMP
 			try {
 				InitialPosition.valueOf(initPosType);
 			} catch (IllegalArgumentException e) {
@@ -57,6 +60,17 @@ public class KinesisConfigUtil {
 				}
 				throw new IllegalArgumentException("Invalid initial position in stream set in config.
Valid values are: " + sb.toString());
 			}
+
+			// specified initial timestamp in stream when using AT_TIMESTAMP
+			if (InitialPosition.valueOf(initPosType) == InitialPosition.AT_TIMESTAMP) {
+				if (!config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP)) {
+					throw new IllegalArgumentException("Please set value for initial timestamp ('"
+						+ ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial
position.");
+				}
+				validateOptionalDateProperty(config, ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
+					"Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.
"
+						+ "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value.
For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 .");
+			}
 		}
 
 		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
@@ -207,4 +221,18 @@ public class KinesisConfigUtil {
 			}
 		}
 	}
+
+	private static void validateOptionalDateProperty(Properties config, String key, String message)
{
+		if (config.containsKey(key)) {
+			try {
+				initTimestampDateFormat.parse(config.getProperty(key));
+				double value = Double.parseDouble(config.getProperty(key));
+				if (value < 0) {
+					throw new NumberFormatException();
+				}
+			} catch (ParseException | NumberFormatException e) {
+				throw new IllegalArgumentException(message);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index a72d8df..2cc0270 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -133,6 +133,38 @@ public class FlinkKinesisConsumerTest {
 	}
 
 	@Test
+	public void testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Please set value for initial timestamp ('"
+			+ ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial
position.");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableDateForInitialTimestampInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial
position in stream.");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "unparsableDate");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
 	public void testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Invalid value given for describe stream operation base backoff
milliseconds");

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index 65e6d4e..964ee76 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -54,7 +54,7 @@ public class FakeKinesisBehavioursFactory {
 			}
 
 			@Override
-			public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String
startingSeqNum) {
+			public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object
startingMarker) {
 				return null;
 			}
 
@@ -121,7 +121,7 @@ public class FakeKinesisBehavioursFactory {
 		}
 
 		@Override
-		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String
startingSeqNum) {
+		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object
startingMarker) {
 			if (!expiredOnceAlready) {
 				// for the first call, just return the iterator of the first batch of records
 				return "0";
@@ -180,7 +180,7 @@ public class FakeKinesisBehavioursFactory {
 		}
 
 		@Override
-		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String
startingSeqNum) {
+		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object
startingMarker) {
 			// this will be called only one time per ShardConsumer;
 			// so, simply return the iterator of the first batch of records
 			return "0";
@@ -250,7 +250,7 @@ public class FakeKinesisBehavioursFactory {
 		}
 
 		@Override
-		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String
startingSeqNum) {
+		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object
startingMarker) {
 			return null;
 		}
 


Mime
View raw message