flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4821) Implement rescalable non-partitioned state for Kinesis Connector
Date Sat, 22 Apr 2017 04:31:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15979752#comment-15979752
] 

ASF GitHub Bot commented on FLINK-4821:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3001#discussion_r112802181
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
---
    @@ -267,38 +289,77 @@ public void close() throws Exception {
     	// ------------------------------------------------------------------------
     
     	@Override
    -	public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId,
long checkpointTimestamp) throws Exception {
    +	public void snapshotState(FunctionSnapshotContext context) throws Exception {
     		if (lastStateSnapshot == null) {
     			LOG.debug("snapshotState() requested on not yet opened source; returning null.");
    -			return null;
    -		}
    -
    -		if (fetcher == null) {
    +		} else if (fetcher == null) {
     			LOG.debug("snapshotState() requested on not yet running source; returning null.");
    -			return null;
    -		}
    -
    -		if (!running) {
    +		} else if (!running) {
     			LOG.debug("snapshotState() called on closed source; returning null.");
    -			return null;
    -		}
    +		} else {
    +			if (LOG.isDebugEnabled()) {
    +				LOG.debug("Snapshotting state ...");
    +			}
     
    -		if (LOG.isDebugEnabled()) {
    -			LOG.debug("Snapshotting state ...");
    -		}
    +			sequenceNumsStateForCheckpoint.clear();
    +			lastStateSnapshot = fetcher.snapshotState();
     
    -		lastStateSnapshot = fetcher.snapshotState();
    +			if (LOG.isDebugEnabled()) {
    +				LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id:
{}, timestamp: {}",
    +					lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp());
    +			}
     
    -		if (LOG.isDebugEnabled()) {
    -			LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id:
{}, timestamp: {}",
    -				lastStateSnapshot.toString(), checkpointId, checkpointTimestamp);
    +			for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet())
{
    +				sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue()));
    +			}
     		}
    +	}
     
    -		return lastStateSnapshot;
    +	@Override
    +	public void initializeState(FunctionInitializationContext context) throws Exception
{
    +		TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> tuple = new
TupleTypeInfo<>(
    +			TypeInformation.of(KinesisStreamShard.class),
    +			TypeInformation.of(SequenceNumber.class)
    +		);
    +
    +		sequenceNumsStateForCheckpoint = context.getOperatorStateStore().getUnionListState(
    +			new ListStateDescriptor<>("Kinesis-Stream-Shard-State", tuple));
    --- End diff --
    
    Can we change this state name string to be `static final String`? And have a comment that
it cannot be changed. 


> Implement rescalable non-partitioned state for Kinesis Connector
> ----------------------------------------------------------------
>
>                 Key: FLINK-4821
>                 URL: https://issues.apache.org/jira/browse/FLINK-4821
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kinesis Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Wei-Che Wei
>
> FLINK-4379 added the rescalable non-partitioned state feature, along with the implementation
for the Kafka connector.
> The AWS Kinesis connector will benefit from the feature and should implement it too.
This ticket tracks progress for this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message