flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3102) Allow reading from multiple topics with one FlinkKafkaConsumer
Date Mon, 07 Dec 2015 16:01:11 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15045128#comment-15045128
] 

ASF GitHub Bot commented on FLINK-3102:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1437#discussion_r46837508
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
---
    @@ -563,37 +559,44 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception
{
     					LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
     					return;
     				}
    -	
    -				checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap);
    +
    +				//noinspection unchecked
    +				checkpointOffsets = (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);
    +
     				
     				// remove older checkpoints in map
     				for (int i = 0; i < posInMap; i++) {
     					pendingCheckpoints.remove(0);
     				}
     			}
    -	
    -			if (LOG.isInfoEnabled()) {
    -				LOG.info("Committing offsets {} to offset store: {}", Arrays.toString(checkpointOffsets),
offsetStore);
    +			if(checkpointOffsets.size() == 0) {
    +				LOG.info("Checkpoint state was empty.");
    +				return;
     			}
     	
     			// build the map of (topic,partition) -> committed offset
    -			Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
    -			for (TopicPartition tp : subscribedPartitions) {
    -				
    -				int partition = tp.partition();
    -				long offset = checkpointOffsets[partition];
    -				long lastCommitted = commitedOffsets[partition];
    -				
    +			Map<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>();
    +			for (KafkaTopicPartition tp : subscribedPartitions) {
    +				Long offset = checkpointOffsets.get(tp);
    +				Long lastCommitted = commitedOffsets.get(tp);
    +				if(lastCommitted == null) {
    +					lastCommitted = OFFSET_NOT_SET;
    +				}
     				if (offset != OFFSET_NOT_SET) {
     					if (offset > lastCommitted) {
     						offsetsToCommit.put(tp, offset);
    -						LOG.debug("Committing offset {} for partition {}", offset, partition);
    +						//noinspection unchecked
    --- End diff --
    
    Inspection suppression should not be needed...


> Allow reading from multiple topics with one FlinkKafkaConsumer
> --------------------------------------------------------------
>
>                 Key: FLINK-3102
>                 URL: https://issues.apache.org/jira/browse/FLINK-3102
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kafka Connector
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>
> Currently, a Kafka consumer allows to read from only one topic.
> For cases where multiple topics contain messages with the same schema, it is useful to
allow to subscribe to many topics using one FlinkKafkaConsumer instance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message