flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
Date Tue, 15 May 2018 17:49:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476245#comment-16476245
] 

ASF GitHub Bot commented on FLINK-8944:
---------------------------------------

Github user kailashhd commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r188377393
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
---
    @@ -382,50 +386,62 @@ protected static boolean isRecoverableException(AmazonServiceException
ex) {
     	 * @param startShardId which shard to start with for this describe operation (earlier
shard's infos will not appear in result)
     	 * @return the result of the describe stream operation
     	 */
    -	private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId)
throws InterruptedException {
    -		final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
    -		describeStreamRequest.setStreamName(streamName);
    -		describeStreamRequest.setExclusiveStartShardId(startShardId);
    +	private ListShardsResult listShards(String streamName, @Nullable String startShardId,
    +																			@Nullable String startNextToken)
    +			throws InterruptedException {
    +		final ListShardsRequest listShardsRequest = new ListShardsRequest();
    +		if (startNextToken == null) {
    +			listShardsRequest.setExclusiveStartShardId(startShardId);
    +			listShardsRequest.setStreamName(streamName);
    +		} else {
    +			// Note the nextToken returned by AWS expires within 300 sec.
    +			listShardsRequest.setNextToken(startNextToken);
    +		}
     
    -		DescribeStreamResult describeStreamResult = null;
    +		ListShardsResult listShardsResults = null;
     
    -		// Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
    +		// Call ListShards, with full-jitter backoff (if we get LimitExceededException).
     		int attemptCount = 0;
    -		while (describeStreamResult == null) { // retry until we get a result
    +		// List Shards returns just the first 1000 shard entries. Make sure that all entries
    +		// are taken up.
    +		while (listShardsResults == null) { // retry until we get a result
     			try {
    -				describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
    +				listShardsResults = kinesisClient.listShards(listShardsRequest);
     			} catch (LimitExceededException le) {
     				long backoffMillis = fullJitterBackoff(
    -					describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant,
attemptCount++);
    -				LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing
off for "
    -					+ backoffMillis + " millis.");
    +						listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant,
attemptCount++);
    +					LOG.warn("Got LimitExceededException when listing shards from stream " + streamName
    +									+ ". Backing off for " + backoffMillis + " millis.");
     				Thread.sleep(backoffMillis);
    -			} catch (ResourceNotFoundException re) {
    -				throw new RuntimeException("Error while getting stream details", re);
    -			}
    -		}
    -
    -		String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
    -		if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString())))
{
    -			if (LOG.isWarnEnabled()) {
    -				LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result
of the current " +
    -					"describeStream operation will not contain any shard information.");
    +			} catch (ResourceInUseException reInUse) {
    +				if (LOG.isWarnEnabled()) {
    +					// List Shards will throw an exception if stream in not in active state. Will return
    +					LOG.warn("The stream is currently not in active state. Reusing the older state "
    +							+ "for the time being");
    +					break;
    +				}
    +			} catch (ResourceNotFoundException reNotFound) {
    +				throw new RuntimeException("Stream not found. Error while getting shard list.", reNotFound);
    +			} catch (InvalidArgumentException inArg) {
    +				throw new RuntimeException("Invalid Arguments to listShards.", inArg);
    +			} catch (ExpiredNextTokenException expiredToken) {
    +				LOG.warn("List Shards has an expired token. Reusing the previous state.");
    +				break;
     			}
     		}
    -
    -		// Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before
the exclusive
    -		// start shard id in the returned shards list; check if we need to remove these erroneously
returned shards
    -		if (startShardId != null) {
    -			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
    +    // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards
before
    +		// the exclusive start shard id in the returned shards list; check if we need to remove
    +		// these erroneously returned shards.
    +		if (startShardId != null && listShardsResults != null) {
    +			List<Shard> shards = listShardsResults.getShards();
     			Iterator<Shard> shardItr = shards.iterator();
    -			while (shardItr.hasNext()) {
    +			while (shardItr.hasNext()){
    --- End diff --
    
    Done. 


> Use ListShards for shard discovery in the flink kinesis connector
> -----------------------------------------------------------------
>
>                 Key: FLINK-8944
>                 URL: https://issues.apache.org/jira/browse/FLINK-8944
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Kailash Hassan Dayanand
>            Priority: Minor
>
> Currently the DescribeStream AWS API used to get list of shards is has a restricted rate
limits on AWS. (5 requests per sec per account). This is problematic when running multiple
flink jobs all on same account since each subtasks calls the Describe Stream. Changing this
to ListShards will provide more flexibility on rate limits as ListShards has a 100 requests
per second per data stream limits.
> More details on the mailing list. https://goo.gl/mRXjKh



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message