Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7EA1F200B4A for ; Tue, 5 Jul 2016 20:39:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7D340160A60; Tue, 5 Jul 2016 18:39:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4149C160A6F for ; Tue, 5 Jul 2016 20:39:06 +0200 (CEST) Received: (qmail 16407 invoked by uid 500); 5 Jul 2016 18:39:05 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 16325 invoked by uid 99); 5 Jul 2016 18:39:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Jul 2016 18:39:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 48115E38B3; Tue, 5 Jul 2016 18:39:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rmetzger@apache.org To: commits@flink.apache.org Date: Tue, 05 Jul 2016 18:39:07 -0000 Message-Id: <1dcb50729c214d6b99e83c3998fd9abc@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/4] flink git commit: [FLINK-3231] FlinkKinesisConsumer rework to handle Kinesis resharding archived-at: Tue, 05 Jul 2016 18:39:08 -0000 http://git-wip-us.apache.org/repos/asf/flink/blob/17dfd68d/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java index b21c5bb..53ed11b 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java @@ -49,51 +49,20 @@ public class KinesisStreamShard implements Serializable { this.streamName = checkNotNull(streamName); this.shard = checkNotNull(shard); - this.cachedHash = 37 * (streamName.hashCode() + shard.hashCode()); + // since our description of Kinesis Streams shards can be fully defined with the stream name and shard id, + // our hash doesn't need to use hash code of Amazon's description of Shards, which uses other info for calculation + int hash = 17; + hash = 37 * hash + streamName.hashCode(); + hash = 37 * hash + shard.getShardId().hashCode(); + this.cachedHash = hash; } public String getStreamName() { return streamName; } - public String getShardId() { - return shard.getShardId(); - } - - public String getStartingSequenceNumber() { - return shard.getSequenceNumberRange().getStartingSequenceNumber(); - } - - public String getEndingSequenceNumber() { - return shard.getSequenceNumberRange().getEndingSequenceNumber(); - } - - public String getStartingHashKey() { - return shard.getHashKeyRange().getStartingHashKey(); - } - - public String getEndingHashKey() { - return shard.getHashKeyRange().getEndingHashKey(); - } - public boolean isClosed() { - return (getEndingSequenceNumber() != null); - } - - public String getParentShardId() { - return shard.getParentShardId(); - } - - public String getAdjacentParentShardId() { - return shard.getAdjacentParentShardId(); - } - - public boolean isSplitShard() { - return (getParentShardId() != null && getAdjacentParentShardId() == null); - } - - public boolean isMergedShard() { - return (getParentShardId() != null && getAdjacentParentShardId() != null); + return (shard.getSequenceNumberRange().getEndingSequenceNumber() != null); } public Shard getShard() { @@ -104,13 +73,7 @@ public class KinesisStreamShard implements Serializable { public String toString() { return "KinesisStreamShard{" + "streamName='" + streamName + "'" + - ", shardId='" + getShardId() + "'" + - ", parentShardId='" + getParentShardId() + "'" + - ", adjacentParentShardId='" + getAdjacentParentShardId() + "'" + - ", startingSequenceNumber='" + getStartingSequenceNumber() + "'" + - ", endingSequenceNumber='" + getEndingSequenceNumber() + "'" + - ", startingHashKey='" + getStartingHashKey() + "'" + - ", endingHashKey='" + getEndingHashKey() + "'}"; + ", shard='" + shard.toString() + "'}"; } @Override @@ -132,4 +95,39 @@ public class KinesisStreamShard implements Serializable { public int hashCode() { return cachedHash; } + + /** + * Utility function to compare two shard ids + * + * @param firstShardId first shard id to compare + * @param secondShardId second shard id to compare + * @return a value less than 0 if the first shard id is smaller than the second shard id, + * or a value larger than 0 the first shard is larger then the second shard id, + * or 0 if they are equal + */ + public static int compareShardIds(String firstShardId, String secondShardId) { + if (!isValidShardId(firstShardId)) { + throw new IllegalArgumentException("The first shard id has invalid format."); + } + + if (!isValidShardId(secondShardId)) { + throw new IllegalArgumentException("The second shard id has invalid format."); + } + + // digit segment of the shard id starts at index 8 + return Long.compare(Long.parseLong(firstShardId.substring(8)), Long.parseLong(secondShardId.substring(8))); + } + + /** + * Checks if a shard id has valid format. + * Kinesis stream shard ids have 12-digit numbers left-padded with 0's, + * prefixed with "shardId-", ex. "shardId-000000000015". + * + * @param shardId the shard id to check + * @return whether the shard id is valid + */ + public static boolean isValidShardId(String shardId) { + if (shardId == null) { return false; } + return shardId.matches("^shardId-\\d{12}"); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/17dfd68d/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java new file mode 100644 index 0000000..00181da --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.model; + +/** + * A wrapper class that bundles a {@link KinesisStreamShard} with its last processed sequence number. + */ +public class KinesisStreamShardState { + + private KinesisStreamShard kinesisStreamShard; + private SequenceNumber lastProcessedSequenceNum; + + public KinesisStreamShardState(KinesisStreamShard kinesisStreamShard, SequenceNumber lastProcessedSequenceNum) { + this.kinesisStreamShard = kinesisStreamShard; + this.lastProcessedSequenceNum = lastProcessedSequenceNum; + } + + public KinesisStreamShard getKinesisStreamShard() { + return this.kinesisStreamShard; + } + + public SequenceNumber getLastProcessedSequenceNum() { + return this.lastProcessedSequenceNum; + } + + public void setLastProcessedSequenceNum(SequenceNumber update) { + this.lastProcessedSequenceNum = update; + } + + @Override + public String toString() { + return "KinesisStreamShardState{" + + "kinesisStreamShard='" + kinesisStreamShard.toString() + "'" + + ", lastProcessedSequenceNumber='" + lastProcessedSequenceNum.toString() + "'}"; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof KinesisStreamShardState)) { + return false; + } + + if (obj == this) { + return true; + } + + KinesisStreamShardState other = (KinesisStreamShardState) obj; + + return kinesisStreamShard.equals(other.getKinesisStreamShard()) && lastProcessedSequenceNum.equals(other.getLastProcessedSequenceNum()); + } + + @Override + public int hashCode() { + return 37 * (kinesisStreamShard.hashCode() + lastProcessedSequenceNum.hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/17dfd68d/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java index 55752f8..8182201 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java @@ -27,10 +27,6 @@ import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetche */ public enum SentinelSequenceNumber { - /** Flag value to indicate that the sequence number of a shard is not set. This value is used - * as an initial value in {@link KinesisDataFetcher}'s constructor for all shard's sequence number. */ - SENTINEL_SEQUENCE_NUMBER_NOT_SET( new SequenceNumber("SEQUENCE_NUMBER_NOT_SET") ), - /** Flag value for shard's sequence numbers to indicate that the * shard should start to be read from the latest incoming records */ SENTINEL_LATEST_SEQUENCE_NUM( new SequenceNumber("LATEST_SEQUENCE_NUM") ), http://git-wip-us.apache.org/repos/asf/flink/blob/17dfd68d/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java new file mode 100644 index 0000000..04b1654 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; + +import java.util.LinkedList; +import java.util.List; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * Basic model class to bundle the shards retrieved from Kinesis on a {@link KinesisProxyInterface#getShardList(Map)} call. + */ +public class GetShardListResult { + + private final Map> streamsToRetrievedShardList = new HashMap<>(); + + public void addRetrievedShardToStream(String stream, KinesisStreamShard retrievedShard) { + if (!streamsToRetrievedShardList.containsKey(stream)) { + streamsToRetrievedShardList.put(stream, new LinkedList()); + } + streamsToRetrievedShardList.get(stream).add(retrievedShard); + } + + public void addRetrievedShardsToStream(String stream, List retrievedShards) { + if (retrievedShards.size() != 0) { + if (!streamsToRetrievedShardList.containsKey(stream)) { + streamsToRetrievedShardList.put(stream, new LinkedList()); + } + streamsToRetrievedShardList.get(stream).addAll(retrievedShards); + } + } + + public List getRetrievedShardListOfStream(String stream) { + if (!streamsToRetrievedShardList.containsKey(stream)) { + return null; + } else { + return streamsToRetrievedShardList.get(stream); + } + } + + public KinesisStreamShard getLastSeenShardOfStream(String stream) { + if (!streamsToRetrievedShardList.containsKey(stream)) { + return null; + } else { + return streamsToRetrievedShardList.get(stream).getLast(); + } + } + + public boolean hasRetrievedShards() { + return !streamsToRetrievedShardList.isEmpty(); + } + + public Set getStreamsWithRetrievedShards() { + return streamsToRetrievedShardList.keySet(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/17dfd68d/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java index d035c03..22f667e 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java @@ -17,21 +17,17 @@ package org.apache.flink.streaming.connectors.kinesis.proxy; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.ClientConfigurationFactory; -import com.amazonaws.regions.Region; -import com.amazonaws.regions.Regions; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.model.DescribeStreamRequest; import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.GetRecordsRequest; import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.GetShardIteratorResult; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.LimitExceededException; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.StreamStatus; import com.amazonaws.services.kinesis.model.Shard; -import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; @@ -41,174 +37,288 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.Map; +import java.util.Random; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A utility class that is used as a proxy to make calls to AWS Kinesis - * for several functions, such as getting a list of shards and fetching - * a batch of data records starting from a specified record sequence number. + * Kinesis proxy implementation - a utility class that is used as a proxy to make + * calls to AWS Kinesis for several functions, such as getting a list of shards and + * fetching a batch of data records starting from a specified record sequence number. * * NOTE: * In the AWS KCL library, there is a similar implementation - {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}. * This implementation differs mainly in that we can make operations to arbitrary Kinesis streams, which is a needed * functionality for the Flink Kinesis Connecter since the consumer may simultaneously read from multiple Kinesis streams. */ -public class KinesisProxy { +public class KinesisProxy implements KinesisProxyInterface { private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class); /** The actual Kinesis client from the AWS SDK that we will be using to make calls */ private final AmazonKinesisClient kinesisClient; - /** Configuration properties of this Flink Kinesis Connector */ - private final Properties configProps; + /** Random seed used to calculate backoff jitter for Kinesis operations */ + private final static Random seed = new Random(); + + // ------------------------------------------------------------------------ + // describeStream() related performance settings + // ------------------------------------------------------------------------ + + /** Base backoff millis for the describe stream operation */ + private final long describeStreamBaseBackoffMillis; + + /** Maximum backoff millis for the describe stream operation */ + private final long describeStreamMaxBackoffMillis; + + /** Exponential backoff power constant for the describe stream operation */ + private final double describeStreamExpConstant; + + // ------------------------------------------------------------------------ + // getRecords() related performance settings + // ------------------------------------------------------------------------ + + /** Base backoff millis for the get records operation */ + private final long getRecordsBaseBackoffMillis; + + /** Maximum backoff millis for the get records operation */ + private final long getRecordsMaxBackoffMillis; + + /** Exponential backoff power constant for the get records operation */ + private final double getRecordsExpConstant; + + /** Maximum attempts for the get records operation */ + private final int getRecordsMaxAttempts; + + // ------------------------------------------------------------------------ + // getShardIterator() related performance settings + // ------------------------------------------------------------------------ + + /** Base backoff millis for the get shard iterator operation */ + private final long getShardIteratorBaseBackoffMillis; + + /** Maximum backoff millis for the get shard iterator operation */ + private final long getShardIteratorMaxBackoffMillis; + + /** Exponential backoff power constant for the get shard iterator operation */ + private final double getShardIteratorExpConstant; + + /** Maximum attempts for the get shard iterator operation */ + private final int getShardIteratorMaxAttempts; /** * Create a new KinesisProxy based on the supplied configuration properties * * @param configProps configuration properties containing AWS credential and AWS region info */ - public KinesisProxy(Properties configProps) { - this.configProps = checkNotNull(configProps); + private KinesisProxy(Properties configProps) { + checkNotNull(configProps); + + this.kinesisClient = AWSUtil.createKinesisClient(configProps); + + this.describeStreamBaseBackoffMillis = Long.valueOf( + configProps.getProperty( + KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_BASE, + Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE))); + this.describeStreamMaxBackoffMillis = Long.valueOf( + configProps.getProperty( + KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_MAX, + Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX))); + this.describeStreamExpConstant = Double.valueOf( + configProps.getProperty( + KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, + Double.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT))); - /* The AWS region that this proxy will be making calls to */ - String regionId = configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION); - // set Flink as a user agent - ClientConfiguration config = new ClientConfigurationFactory().getConfig(); - config.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() + " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector"); - AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials(), config); + this.getRecordsBaseBackoffMillis = Long.valueOf( + configProps.getProperty( + KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_BASE, + Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE))); + this.getRecordsMaxBackoffMillis = Long.valueOf( + configProps.getProperty( + KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_MAX, + Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX))); + this.getRecordsExpConstant = Double.valueOf( + configProps.getProperty( + KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, + Double.toString(KinesisConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT))); + this.getRecordsMaxAttempts = Integer.valueOf( + configProps.getProperty( + KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_RETRIES, + Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES))); - client.setRegion(Region.getRegion(Regions.fromName(regionId))); + this.getShardIteratorBaseBackoffMillis = Long.valueOf( + configProps.getProperty( + KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_BASE, + Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE))); + this.getShardIteratorMaxBackoffMillis = Long.valueOf( + configProps.getProperty( + KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_MAX, + Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX))); + this.getShardIteratorExpConstant = Double.valueOf( + configProps.getProperty( + KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, + Double.toString(KinesisConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT))); + this.getShardIteratorMaxAttempts = Integer.valueOf( + configProps.getProperty( + KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_RETRIES, + Long.toString(KinesisConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES))); - this.kinesisClient = client; } /** - * Get the next batch of data records using a specific shard iterator + * Creates a Kinesis proxy. * - * @param shardIterator a shard iterator that encodes info about which shard to read and where to start reading - * @param maxRecordsToGet the maximum amount of records to retrieve for this batch - * @return the batch of retrieved records + * @param configProps configuration properties + * @return the created kinesis proxy + */ + public static KinesisProxyInterface create(Properties configProps) { + return new KinesisProxy(configProps); + } + + /** + * {@inheritDoc} */ - public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { + @Override + public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException { final GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(maxRecordsToGet); GetRecordsResult getRecordsResult = null; - int remainingRetryTimes = Integer.valueOf( - configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, Integer.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRY_TIMES))); - long describeStreamBackoffTimeInMillis = Long.valueOf( - configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF))); - - int i=0; - while (i <= remainingRetryTimes && getRecordsResult == null) { + int attempt = 0; + while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) { try { getRecordsResult = kinesisClient.getRecords(getRecordsRequest); } catch (ProvisionedThroughputExceededException ex) { + long backoffMillis = fullJitterBackoff( + getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++); LOG.warn("Got ProvisionedThroughputExceededException. Backing off for " - + describeStreamBackoffTimeInMillis + " millis."); - try { - Thread.sleep(describeStreamBackoffTimeInMillis); - } catch (InterruptedException interruptEx) { - // - } + + backoffMillis + " millis."); + Thread.sleep(backoffMillis); } - i++; } if (getRecordsResult == null) { - throw new RuntimeException("Rate Exceeded"); + throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts + "retry" + + "attempts returned ProvisionedThroughputExceededException."); } return getRecordsResult; } /** - * Get the list of shards associated with multiple Kinesis streams - * - * @param streamNames the list of Kinesis streams - * @return a list of {@link KinesisStreamShard}s + * {@inheritDoc} */ - public List getShardList(List streamNames) { - List shardList = new ArrayList<>(); + @Override + public GetShardListResult getShardList(Map streamNamesWithLastSeenShardIds) throws InterruptedException { + GetShardListResult result = new GetShardListResult(); - for (String stream : streamNames) { - DescribeStreamResult describeStreamResult; - String lastSeenShardId = null; - - do { - describeStreamResult = describeStream(stream, lastSeenShardId); - - List shards = describeStreamResult.getStreamDescription().getShards(); - for (Shard shard : shards) { - shardList.add(new KinesisStreamShard(stream, shard)); - } - lastSeenShardId = shards.get(shards.size() - 1).getShardId(); - } while (describeStreamResult.getStreamDescription().isHasMoreShards()); + for (Map.Entry streamNameWithLastSeenShardId : streamNamesWithLastSeenShardIds.entrySet()) { + String stream = streamNameWithLastSeenShardId.getKey(); + String lastSeenShardId = streamNameWithLastSeenShardId.getValue(); + result.addRetrievedShardsToStream(stream, getShardsOfStream(stream, lastSeenShardId)); } - return shardList; + return result; } /** - * Get a shard iterator for a Kinesis shard - * - * @param shard the shard to get the iterator for - * @param shardIteratorType the iterator type to get - * @param startingSeqNum the sequence number that the iterator will start from - * @return the shard iterator + * {@inheritDoc} */ - public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) { - return kinesisClient.getShardIterator(shard.getStreamName(), shard.getShardId(), shardIteratorType, startingSeqNum).getShardIterator(); + @Override + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) throws InterruptedException { + GetShardIteratorResult getShardIteratorResult = null; + + int attempt = 0; + while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) { + try { + getShardIteratorResult = + kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum); + } catch (ProvisionedThroughputExceededException ex) { + long backoffMillis = fullJitterBackoff( + getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++); + LOG.warn("Got ProvisionedThroughputExceededException. Backing off for " + + backoffMillis + " millis."); + Thread.sleep(backoffMillis); + } + } + + if (getShardIteratorResult == null) { + throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts + "retry" + + "attempts returned ProvisionedThroughputExceededException."); + } + return getShardIteratorResult.getShardIterator(); + } + + private List getShardsOfStream(String streamName, String lastSeenShardId) throws InterruptedException { + List shardsOfStream = new ArrayList<>(); + + DescribeStreamResult describeStreamResult; + do { + describeStreamResult = describeStream(streamName, lastSeenShardId); + + List shards = describeStreamResult.getStreamDescription().getShards(); + for (Shard shard : shards) { + shardsOfStream.add(new KinesisStreamShard(streamName, shard)); + } + + if (shards.size() != 0) { + lastSeenShardId = shards.get(shards.size() - 1).getShardId(); + } + } while (describeStreamResult.getStreamDescription().isHasMoreShards()); + + return shardsOfStream; } /** * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess. * + * This method is using a "full jitter" approach described in AWS's article, + * "Exponential Backoff and Jitter". + * This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This + * jitter backoff approach will help distribute calls across the fetchers over time. + * * @param streamName the stream to describe * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ - private DescribeStreamResult describeStream(String streamName, String startShardId) { + private DescribeStreamResult describeStream(String streamName, String startShardId) throws InterruptedException { final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); describeStreamRequest.setExclusiveStartShardId(startShardId); DescribeStreamResult describeStreamResult = null; - String streamStatus = null; - int remainingRetryTimes = Integer.valueOf( - configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, Integer.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRY_TIMES))); - long describeStreamBackoffTimeInMillis = Long.valueOf( - configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF))); - - // Call DescribeStream, with backoff and retries (if we get LimitExceededException). - while ((remainingRetryTimes >= 0) && (describeStreamResult == null)) { + + // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). + int attemptCount = 0; + while (describeStreamResult == null) { // retry until we get a result try { describeStreamResult = kinesisClient.describeStream(describeStreamRequest); - streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); } catch (LimitExceededException le) { + long backoffMillis = fullJitterBackoff( + describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++); LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " - + describeStreamBackoffTimeInMillis + " millis."); - try { - Thread.sleep(describeStreamBackoffTimeInMillis); - } catch (InterruptedException ie) { - LOG.debug("Stream " + streamName + " : Sleep was interrupted ", ie); - } + + backoffMillis + " millis."); + Thread.sleep(backoffMillis); } catch (ResourceNotFoundException re) { throw new RuntimeException("Error while getting stream details", re); } - remainingRetryTimes--; } - if (streamStatus == null) { - throw new RuntimeException("Can't get stream info from after 3 retries due to LimitExceededException"); - } else if (streamStatus.equals(StreamStatus.ACTIVE.toString()) || - streamStatus.equals(StreamStatus.UPDATING.toString())) { - return describeStreamResult; - } else { - throw new RuntimeException("Stream is not Active or Updating"); + String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); + if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) { + if (LOG.isWarnEnabled()) { + LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " + + "describeStream operation will not contain any shard information."); + } } + + return describeStreamResult; + } + + private static long fullJitterBackoff(long base, long max, double power, int attempt) { + long exponentialBackoff = (long) Math.min(max, base * Math.pow(power, attempt)); + return (long)(seed.nextDouble()*exponentialBackoff); // random jitter between 0 and the exponential backoff } } http://git-wip-us.apache.org/repos/asf/flink/blob/17dfd68d/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java new file mode 100644 index 0000000..39ddc52 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; + +import java.util.Map; + +/** + * Interface for a Kinesis proxy that operates on multiple Kinesis streams within the same AWS service region. + */ +public interface KinesisProxyInterface { + + /** + * Get a shard iterator from the specified position in a shard. + * The retrieved shard iterator can be used in {@link KinesisProxyInterface#getRecords(String, int)}} + * to read data from the Kinesis shard. + * + * @param shard the shard to get the iterator + * @param shardIteratorType the iterator type, defining how the shard is to be iterated + * (one of: TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER) + * @param startingSeqNum sequence number, must be null if shardIteratorType is TRIM_HORIZON or LATEST + * @return shard iterator which can be used to read data from Kinesis + * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the + * operation has exceeded the rate limit; this exception will be thrown + * if the backoff is interrupted. + */ + String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) throws InterruptedException; + + /** + * Get the next batch of data records using a specific shard iterator + * + * @param shardIterator a shard iterator that encodes info about which shard to read and where to start reading + * @param maxRecordsToGet the maximum amount of records to retrieve for this batch + * @return the batch of retrieved records, also with a shard iterator that can be used to get the next batch + * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the + * operation has exceeded the rate limit; this exception will be thrown + * if the backoff is interrupted. + */ + GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException; + + /** + * Get shard list of multiple Kinesis streams, ignoring the + * shards of each stream before a specified last seen shard id. + * + * @param streamNamesWithLastSeenShardIds a map with stream as key, and last seen shard id as value + * @return result of the shard list query + * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the + * operation has exceeded the rate limit; this exception will be thrown + * if the backoff is interrupted. + */ + GetShardListResult getShardList(Map streamNamesWithLastSeenShardIds) throws InterruptedException; +} http://git-wip-us.apache.org/repos/asf/flink/blob/17dfd68d/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java index 187f098..2eec0a4 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java @@ -17,13 +17,18 @@ package org.apache.flink.streaming.connectors.kinesis.util; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.ClientConfigurationFactory; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; import com.amazonaws.auth.SystemPropertiesCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; +import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; @@ -35,6 +40,23 @@ import java.util.Properties; public class AWSUtil { /** + * Creates an Amazon Kinesis Client. + * @param configProps configuration properties containing the access key, secret key, and region + * @return a new Amazon Kinesis Client + */ + public static AmazonKinesisClient createKinesisClient(Properties configProps) { + // set a Flink-specific user agent + ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); + awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() + + " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector"); + + AmazonKinesisClient client = + new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials(), awsClientConfig); + client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)))); + return client; + } + + /** * Return a {@link AWSCredentialsProvider} instance corresponding to the configuration properties. * * @param configProps the configuration properties http://git-wip-us.apache.org/repos/asf/flink/blob/17dfd68d/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java index 042b168..8e442de 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java @@ -21,6 +21,7 @@ import com.amazonaws.regions.Regions; import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import java.util.Properties; @@ -98,36 +99,98 @@ public class KinesisConfigUtil { } } - validateOptionalIntProperty(config, KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, - "Invalid value given for describeStream stream operation retry count. Must be a valid integer value."); + validateOptionalPositiveIntProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_MAX, + "Invalid value given for maximum records per getRecords shard operation. Must be a valid non-negative integer value."); - validateOptionalIntProperty(config, KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET, - "Invalid value given for maximum records per getRecords shard operation. Must be a valid integer value."); + validateOptionalPositiveIntProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_RETRIES, + "Invalid value given for maximum retry attempts for getRecords shard operation. Must be a valid non-negative integer value."); - validateOptionalLongProperty(config, KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, - "Invalid value given for describeStream stream operation backoff milliseconds. Must be a valid long value."); + validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_BASE, + "Invalid value given for get records operation base backoff milliseconds. Must be a valid non-negative long value"); - validateOptionalLongProperty(config, KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT, - "Invalid value given for maximum number of items to pack into a PutRecords request. Must be a valid long value."); + validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_MAX, + "Invalid value given for get records operation max backoff milliseconds. Must be a valid non-negative long value"); - validateOptionalLongProperty(config, KinesisConfigConstants.CONFIG_PRODUCER_AGGREGATION_MAX_COUNT, - "Invalid value given for maximum number of items to pack into an aggregated record. Must be a valid long value."); + validateOptionalPositiveDoubleProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, + "Invalid value given for get records operation backoff exponential constant. Must be a valid non-negative double value"); + + validateOptionalPositiveIntProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_RETRIES, + "Invalid value given for maximum retry attempts for getShardIterator shard operation. Must be a valid non-negative integer value."); + + validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_BASE, + "Invalid value given for get shard iterator operation base backoff milliseconds. Must be a valid non-negative long value"); + + validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_MAX, + "Invalid value given for get shard iterator operation max backoff milliseconds. Must be a valid non-negative long value"); + + validateOptionalPositiveDoubleProperty(config, KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, + "Invalid value given for get shard iterator operation backoff exponential constant. Must be a valid non-negative double value"); + + validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_SHARD_DISCOVERY_INTERVAL_MILLIS, + "Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value"); + + validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_BASE, + "Invalid value given for describe stream operation base backoff milliseconds. Must be a valid non-negative long value"); + + validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_MAX, + "Invalid value given for describe stream operation max backoff milliseconds. Must be a valid non-negative long value"); + + validateOptionalPositiveDoubleProperty(config, KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, + "Invalid value given for describe stream operation backoff exponential constant. Must be a valid non-negative double value"); + + validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT, + "Invalid value given for maximum number of items to pack into a PutRecords request. Must be a valid non-negative long value."); + + validateOptionalPositiveLongProperty(config, KinesisConfigConstants.CONFIG_PRODUCER_AGGREGATION_MAX_COUNT, + "Invalid value given for maximum number of items to pack into an aggregated record. Must be a valid non-negative long value."); } - private static void validateOptionalLongProperty(Properties config, String key, String message) { + public static SentinelSequenceNumber getInitialPositionAsSentinelSequenceNumber(Properties config) { + InitialPosition initialPosition = InitialPosition.valueOf(config.getProperty( + KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, InitialPosition.LATEST.toString())); + + switch (initialPosition) { + case TRIM_HORIZON: + return SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM; + case LATEST: + default: + return SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM; + } + } + + private static void validateOptionalPositiveLongProperty(Properties config, String key, String message) { + if (config.containsKey(key)) { + try { + long value = Long.parseLong(config.getProperty(key)); + if (value < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException(message); + } + } + } + + private static void validateOptionalPositiveIntProperty(Properties config, String key, String message) { if (config.containsKey(key)) { try { - Long.parseLong(config.getProperty(key)); + int value = Integer.parseInt(config.getProperty(key)); + if (value < 0) { + throw new NumberFormatException(); + } } catch (NumberFormatException e) { throw new IllegalArgumentException(message); } } } - private static void validateOptionalIntProperty(Properties config, String key, String message) { + private static void validateOptionalPositiveDoubleProperty(Properties config, String key, String message) { if (config.containsKey(key)) { try { - Integer.parseInt(config.getProperty(key)); + double value = Double.parseDouble(config.getProperty(key)); + if (value < 0) { + throw new NumberFormatException(); + } } catch (NumberFormatException e) { throw new IllegalArgumentException(message); } http://git-wip-us.apache.org/repos/asf/flink/blob/17dfd68d/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index 5ced019..ec9ee9a 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -17,16 +17,18 @@ package org.apache.flink.streaming.connectors.kinesis; +import com.amazonaws.services.kinesis.model.Shard; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; -import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; -import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; -import org.apache.flink.streaming.connectors.kinesis.testutils.ReferenceKinesisShardTopologies; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; +import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator; import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -36,26 +38,15 @@ import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.UUID; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.powermock.api.mockito.PowerMockito.mock; -import static org.powermock.api.mockito.PowerMockito.mockStatic; -import static org.powermock.api.mockito.PowerMockito.whenNew; /** - * Suite of FlinkKinesisConsumer tests, including utility static method tests, - * and tests for the methods called throughout the source life cycle with mocked KinesisProxy. + * Suite of FlinkKinesisConsumer tests for the methods called throughout the source life cycle. */ @RunWith(PowerMockRunner.class) @PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class}) @@ -148,365 +139,282 @@ public class FlinkKinesisConsumerTest { } @Test - public void testUnparsableIntForDescribeStreamRetryCountInConfig() { + public void testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() { exception.expect(IllegalArgumentException.class); - exception.expectMessage("Invalid value given for describeStream stream operation retry count"); + exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds"); Properties testConfig = new Properties(); testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, "unparsableInt"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong"); KinesisConfigUtil.validateConfiguration(testConfig); } @Test - public void testUnparsableLongForDescribeStreamBackoffMillisInConfig() { + public void testUnparsableLongForDescribeStreamBackoffMaxMillisInConfig() { exception.expect(IllegalArgumentException.class); - exception.expectMessage("Invalid value given for describeStream stream operation backoff milliseconds"); + exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds"); Properties testConfig = new Properties(); testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, "unparsableLong"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong"); KinesisConfigUtil.validateConfiguration(testConfig); } @Test - public void testUnparsableIntForGetRecordsMaxCountInConfig() { + public void testUnparsableDoubleForDescribeStreamBackoffExponentialConstantInConfig() { exception.expect(IllegalArgumentException.class); - exception.expectMessage("Invalid value given for maximum records per getRecords shard operation"); + exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant"); Properties testConfig = new Properties(); testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET, "unparsableInt"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); KinesisConfigUtil.validateConfiguration(testConfig); } - // ---------------------------------------------------------------------- - // FlinkKinesisConsumer.assignShards() tests - // ---------------------------------------------------------------------- - @Test - public void testShardNumEqualConsumerNum() { - try { - List fakeShards = ReferenceKinesisShardTopologies.flatTopologyWithFourOpenShards(); - int consumerTaskCount = fakeShards.size(); - - for (int consumerNum=0; consumerNum < consumerTaskCount; consumerNum++) { - List assignedShardsToThisConsumerTask = - FlinkKinesisConsumer.assignShards(fakeShards, consumerTaskCount, consumerNum); - - // the ith consumer should be assigned exactly 1 shard, - // which is always the ith shard of a shard list that only has open shards - assertEquals(1, assignedShardsToThisConsumerTask.size()); - assertTrue(assignedShardsToThisConsumerTask.get(0).equals(fakeShards.get(consumerNum))); - } - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public void testUnparsableIntForGetRecordsRetriesInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for maximum retry attempts for getRecords shard operation"); + + Properties testConfig = new Properties(); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_RETRIES, "unparsableInt"); + + KinesisConfigUtil.validateConfiguration(testConfig); } @Test - public void testShardNumFewerThanConsumerNum() { - try { - List fakeShards = ReferenceKinesisShardTopologies.flatTopologyWithFourOpenShards(); - int consumerTaskCount = fakeShards.size() + 3; - - for (int consumerNum = 0; consumerNum < consumerTaskCount; consumerNum++) { - List assignedShardsToThisConsumerTask = - FlinkKinesisConsumer.assignShards(fakeShards, consumerTaskCount, consumerNum); - - // for ith consumer with i < the total num of shards, - // the ith consumer should be assigned exactly 1 shard, - // which is always the ith shard of a shard list that only has open shards; - // otherwise, the consumer should not be assigned any shards - if (consumerNum < fakeShards.size()) { - assertEquals(1, assignedShardsToThisConsumerTask.size()); - assertTrue(assignedShardsToThisConsumerTask.get(0).equals(fakeShards.get(consumerNum))); - } else { - assertEquals(0, assignedShardsToThisConsumerTask.size()); - } - } - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public void testUnparsableIntForGetRecordsMaxCountInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for maximum records per getRecords shard operation"); + + Properties testConfig = new Properties(); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_MAX, "unparsableInt"); + + KinesisConfigUtil.validateConfiguration(testConfig); } @Test - public void testShardNumMoreThanConsumerNum() { - try { - List fakeShards = ReferenceKinesisShardTopologies.flatTopologyWithFourOpenShards(); - int consumerTaskCount = fakeShards.size() - 1; - - for (int consumerNum = 0; consumerNum < consumerTaskCount; consumerNum++) { - List assignedShardsToThisConsumerTask = - FlinkKinesisConsumer.assignShards(fakeShards, consumerTaskCount, consumerNum); - - // since the number of consumer tasks is short by 1, - // all but the first consumer task should be assigned 1 shard, - // while the first consumer task is assigned 2 shards - if (consumerNum != 0) { - assertEquals(1, assignedShardsToThisConsumerTask.size()); - assertTrue(assignedShardsToThisConsumerTask.get(0).equals(fakeShards.get(consumerNum))); - } else { - assertEquals(2, assignedShardsToThisConsumerTask.size()); - assertTrue(assignedShardsToThisConsumerTask.get(0).equals(fakeShards.get(0))); - assertTrue(assignedShardsToThisConsumerTask.get(1).equals(fakeShards.get(fakeShards.size()-1))); - } - } - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public void testUnparsableLongForGetRecordsBackoffBaseMillisInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for get records operation base backoff milliseconds"); + + Properties testConfig = new Properties(); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong"); + + KinesisConfigUtil.validateConfiguration(testConfig); } @Test - public void testAssignEmptyShards() { - try { - List fakeShards = new ArrayList<>(0); - int consumerTaskCount = 4; - - for (int consumerNum = 0; consumerNum < consumerTaskCount; consumerNum++) { - List assignedShardsToThisConsumerTask = - FlinkKinesisConsumer.assignShards(fakeShards, consumerTaskCount, consumerNum); - - // should not be assigned anything - assertEquals(0, assignedShardsToThisConsumerTask.size()); - - } - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } + public void testUnparsableLongForGetRecordsBackoffMaxMillisInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for get records operation max backoff milliseconds"); - // ---------------------------------------------------------------------- - // Constructor tests with mocked KinesisProxy - // ---------------------------------------------------------------------- + Properties testConfig = new Properties(); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong"); + + KinesisConfigUtil.validateConfiguration(testConfig); + } @Test - public void testConstructorShouldThrowRuntimeExceptionIfUnableToFindAnyShards() { - exception.expect(RuntimeException.class); - exception.expectMessage("Unable to retrieve any shards"); - - Properties testConsumerConfig = new Properties(); - testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKey"); - testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - - // get a consumer that will not be able to find any shards from AWS Kinesis - FlinkKinesisConsumer dummyConsumer = getDummyConsumerWithMockedKinesisProxy( - 6, 2, "fake-consumer-task-name", - new ArrayList(), new ArrayList(), testConsumerConfig, - null, null, false, false); + public void testUnparsableDoubleForGetRecordsBackoffExponentialConstantInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for get records operation backoff exponential constant"); + + Properties testConfig = new Properties(); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); + + KinesisConfigUtil.validateConfiguration(testConfig); } - // ---------------------------------------------------------------------- - // Tests for open() source life cycle method - // ---------------------------------------------------------------------- + @Test + public void testUnparsableIntForGetShardIteratorRetriesInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator shard operation"); + + Properties testConfig = new Properties(); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_RETRIES, "unparsableInt"); + + KinesisConfigUtil.validateConfiguration(testConfig); + } @Test - public void testOpenWithNoRestoreStateFetcherAdvanceToLatestSentinelSequenceNumberWhenConfigSetToStartFromLatest() throws Exception { - - int fakeNumConsumerTasks = 6; - int fakeThisConsumerTaskIndex = 2; - String fakeThisConsumerTaskName = "fake-this-task-name"; - - List fakeCompleteShardList = ReferenceKinesisShardTopologies.flatTopologyWithFourOpenShards(); - List fakeAssignedShardsToThisConsumerTask = fakeCompleteShardList.subList(2,3); - - Properties testConsumerConfig = new Properties(); - testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKey"); - testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "LATEST"); - - KinesisDataFetcher kinesisDataFetcherMock = Mockito.mock(KinesisDataFetcher.class); - try { - whenNew(KinesisDataFetcher.class).withArguments(fakeAssignedShardsToThisConsumerTask, testConsumerConfig, fakeThisConsumerTaskName).thenReturn(kinesisDataFetcherMock); - } catch (Exception e) { - throw new RuntimeException("Error when power mocking KinesisDataFetcher in test", e); - } + public void testUnparsableLongForGetShardIteratorBackoffBaseMillisInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for get shard iterator operation base backoff milliseconds"); - FlinkKinesisConsumer dummyConsumer = getDummyConsumerWithMockedKinesisProxy( - fakeNumConsumerTasks, fakeThisConsumerTaskIndex, fakeThisConsumerTaskName, - fakeCompleteShardList, fakeAssignedShardsToThisConsumerTask, testConsumerConfig, - null, null, false, false); + Properties testConfig = new Properties(); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong"); - dummyConsumer.open(new Configuration()); + KinesisConfigUtil.validateConfiguration(testConfig); + } - for (KinesisStreamShard shard : fakeAssignedShardsToThisConsumerTask) { - verify(kinesisDataFetcherMock).advanceSequenceNumberTo(shard, SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get()); - } + @Test + public void testUnparsableLongForGetShardIteratorBackoffMaxMillisInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for get shard iterator operation max backoff milliseconds"); + + Properties testConfig = new Properties(); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong"); + KinesisConfigUtil.validateConfiguration(testConfig); } @Test - public void testOpenWithNoRestoreStateFetcherAdvanceToEarliestSentinelSequenceNumberWhenConfigSetToTrimHorizon() throws Exception { - - int fakeNumConsumerTasks = 6; - int fakeThisConsumerTaskIndex = 2; - String fakeThisConsumerTaskName = "fake-this-task-name"; - - List fakeCompleteShardList = ReferenceKinesisShardTopologies.flatTopologyWithFourOpenShards(); - List fakeAssignedShardsToThisConsumerTask = fakeCompleteShardList.subList(2,3); - - Properties testConsumerConfig = new Properties(); - testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKey"); - testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "TRIM_HORIZON"); - - KinesisDataFetcher kinesisDataFetcherMock = Mockito.mock(KinesisDataFetcher.class); - try { - whenNew(KinesisDataFetcher.class).withArguments(fakeAssignedShardsToThisConsumerTask, testConsumerConfig, fakeThisConsumerTaskName).thenReturn(kinesisDataFetcherMock); - } catch (Exception e) { - throw new RuntimeException("Error when power mocking KinesisDataFetcher in test", e); - } + public void testUnparsableDoubleForGetShardIteratorBackoffExponentialConstantInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for get shard iterator operation backoff exponential constant"); - FlinkKinesisConsumer dummyConsumer = getDummyConsumerWithMockedKinesisProxy( - fakeNumConsumerTasks, fakeThisConsumerTaskIndex, fakeThisConsumerTaskName, - fakeCompleteShardList, fakeAssignedShardsToThisConsumerTask, testConsumerConfig, - null, null, false, false); + Properties testConfig = new Properties(); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); - dummyConsumer.open(new Configuration()); + KinesisConfigUtil.validateConfiguration(testConfig); + } - for (KinesisStreamShard shard : fakeAssignedShardsToThisConsumerTask) { - verify(kinesisDataFetcherMock).advanceSequenceNumberTo(shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()); - } + @Test + public void testUnparsableLongForShardDiscoveryIntervalMillisInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for shard discovery sleep interval in milliseconds"); + Properties testConfig = new Properties(); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); + testConfig.setProperty(KinesisConfigConstants.CONFIG_SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong"); + + KinesisConfigUtil.validateConfiguration(testConfig); } + // ---------------------------------------------------------------------- + // Tests related to state initialization + // ---------------------------------------------------------------------- + @Test - public void testOpenWithRestoreStateFetcherAdvanceToCorrespondingSequenceNumbers() throws Exception { - - int fakeNumConsumerTasks = 6; - int fakeThisConsumerTaskIndex = 2; - String fakeThisConsumerTaskName = "fake-this-task-name"; - - List fakeCompleteShardList = ReferenceKinesisShardTopologies.flatTopologyWithFourOpenShards(); - List fakeAssignedShardsToThisConsumerTask = fakeCompleteShardList.subList(2,3); - - Properties testConsumerConfig = new Properties(); - testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); - testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKey"); - testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - testConsumerConfig.setProperty(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "TRIM_HORIZON"); - - KinesisDataFetcher kinesisDataFetcherMock = Mockito.mock(KinesisDataFetcher.class); - try { - whenNew(KinesisDataFetcher.class).withArguments(fakeAssignedShardsToThisConsumerTask, testConsumerConfig, fakeThisConsumerTaskName).thenReturn(kinesisDataFetcherMock); - } catch (Exception e) { - throw new RuntimeException("Error when power mocking KinesisDataFetcher in test", e); - } + public void testSnapshotStateShouldBeNullIfSourceNotOpened() throws Exception { + Properties config = new Properties(); + config.setProperty(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); + config.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, "accessKeyId"); + config.setProperty(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, "secretKey"); - FlinkKinesisConsumer dummyConsumer = getDummyConsumerWithMockedKinesisProxy( - fakeNumConsumerTasks, fakeThisConsumerTaskIndex, fakeThisConsumerTaskName, - fakeCompleteShardList, fakeAssignedShardsToThisConsumerTask, testConsumerConfig, - null, null, false, false); + FlinkKinesisConsumer consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); - // generate random UUIDs as sequence numbers of last checkpointed state for each assigned shard - ArrayList listOfSeqNumOfAssignedShards = new ArrayList<>(fakeAssignedShardsToThisConsumerTask.size()); - for (KinesisStreamShard shard : fakeAssignedShardsToThisConsumerTask) { - listOfSeqNumOfAssignedShards.add(new SequenceNumber(UUID.randomUUID().toString())); - } + assertTrue(consumer.snapshotState(123, 123) == null); //arbitrary checkpoint id and timestamp + } - HashMap fakeRestoredState = new HashMap<>(); - for (int i=0; i consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); + consumer.open(new Configuration()); // only opened, not run - for (int i=0; i fakeCompleteShardList, - List fakeAssignedShardListToThisConsumerTask, - Properties consumerTestConfig, - KinesisDataFetcher fetcher, - HashMap lastSequenceNumsToRestore, - boolean hasAssignedShards, - boolean running) { - - final String dummyKinesisStreamName = "flink-test"; - - final List dummyKinesisStreamList = Collections.singletonList(dummyKinesisStreamName); - - final KinesisProxy kinesisProxyMock = mock(KinesisProxy.class); - - // mock KinesisProxy that is instantiated in the constructor, as well as its getShardList call - try { - whenNew(KinesisProxy.class).withArguments(consumerTestConfig).thenReturn(kinesisProxyMock); - } catch (Exception e) { - throw new RuntimeException("Error when power mocking KinesisProxy in tests", e); - } + // ---------------------------------------------------------------------- + // Tests related to fetcher initialization + // ---------------------------------------------------------------------- - when(kinesisProxyMock.getShardList(dummyKinesisStreamList)).thenReturn(fakeCompleteShardList); + @Test + @SuppressWarnings("unchecked") + public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoint() throws Exception { + KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class); + PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher); - TestableFlinkKinesisConsumer dummyConsumer = - new TestableFlinkKinesisConsumer(dummyKinesisStreamName, fakeNumFlinkConsumerTasks, - fakeThisConsumerTaskIndex, fakeThisConsumerTaskName, consumerTestConfig); + // assume the given config is correct + PowerMockito.mockStatic(KinesisConfigUtil.class); + PowerMockito.doNothing().when(KinesisConfigUtil.class); - try { - Field fetcherField = FlinkKinesisConsumer.class.getDeclaredField("fetcher"); - fetcherField.setAccessible(true); - fetcherField.set(dummyConsumer, fetcher); + TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer( + "fakeStream", new Properties(), 10, 2); + consumer.open(new Configuration()); + consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); - Field lastSequenceNumsField = FlinkKinesisConsumer.class.getDeclaredField("lastSequenceNums"); - lastSequenceNumsField.setAccessible(true); - lastSequenceNumsField.set(dummyConsumer, lastSequenceNumsToRestore); + Mockito.verify(mockedFetcher).setIsRestoringFromFailure(false); + } - Field hasAssignedShardsField = FlinkKinesisConsumer.class.getDeclaredField("hasAssignedShards"); - hasAssignedShardsField.setAccessible(true); - hasAssignedShardsField.set(dummyConsumer, hasAssignedShards); + @Test + @SuppressWarnings("unchecked") + public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception { + KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class); + PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher); - Field runningField = FlinkKinesisConsumer.class.getDeclaredField("running"); - runningField.setAccessible(true); - runningField.set(dummyConsumer, running); - } catch (IllegalAccessException | NoSuchFieldException e) { - // no reason to end up here ... - throw new RuntimeException(e); - } + // assume the given config is correct + PowerMockito.mockStatic(KinesisConfigUtil.class); + PowerMockito.doNothing().when(KinesisConfigUtil.class); - // mock FlinkKinesisConsumer utility static methods - mockStatic(FlinkKinesisConsumer.class); - mockStatic(KinesisConfigUtil.class); - - try { - // assume assignShards static method is correct by mocking - PowerMockito.when( - FlinkKinesisConsumer.assignShards( - fakeCompleteShardList, - fakeNumFlinkConsumerTasks, - fakeThisConsumerTaskIndex)) - .thenReturn(fakeAssignedShardListToThisConsumerTask); - - // assume validatePropertiesConfig static method is correct by mocking - PowerMockito.doNothing().when(KinesisConfigUtil.class, "validateConfiguration", Mockito.any(Properties.class)); - } catch (Exception e) { - e.printStackTrace(); - throw new RuntimeException("Error when power mocking static methods of FlinkKinesisConsumer", e); + HashMap fakeRestoredState = new HashMap<>(); + fakeRestoredState.put( + new KinesisStreamShard("fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), + new SequenceNumber(UUID.randomUUID().toString())); + fakeRestoredState.put( + new KinesisStreamShard("fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), + new SequenceNumber(UUID.randomUUID().toString())); + fakeRestoredState.put( + new KinesisStreamShard("fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))), + new SequenceNumber(UUID.randomUUID().toString())); + fakeRestoredState.put( + new KinesisStreamShard("fakeStream2", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), + new SequenceNumber(UUID.randomUUID().toString())); + fakeRestoredState.put( + new KinesisStreamShard("fakeStream2", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), + new SequenceNumber(UUID.randomUUID().toString())); + + TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer( + "fakeStream", new Properties(), 10, 2); + consumer.restoreState(fakeRestoredState); + consumer.open(new Configuration()); + consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); + + Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true); + for (Map.Entry restoredShard : fakeRestoredState.entrySet()) { + Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream( + restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); + Mockito.verify(mockedFetcher).registerNewSubscribedShardState( + new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); } - - return dummyConsumer; } }