kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6662) Consumer use offsetsForTimes() get offset return None.
Date Thu, 15 Mar 2018 06:37:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399980#comment-16399980
] 

ASF GitHub Bot commented on KAFKA-6662:
---------------------------------------

wangzzu opened a new pull request #4717: KAFKA-6662: Consumer use offsetsForTimes() get offset
return None.
URL: https://github.com/apache/kafka/pull/4717
 
 
   When we use Consumer's method  offsetsForTimes()  to get the topic-partition offset, sometimes
it will return null. Print the client log
   
   ```
   2018-03-15 11:54:05,239] DEBUG Collector TraceCollector dispatcher loop interval 256 upload
0 retry 0 fail 0 (com.meituan.mtrace.collector.sg.AbstractCollector)
   [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INITIAL (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
   [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INTERMEDIATE (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
   [2018-03-15 11:54:05,247] DEBUG Set SASL client state to COMPLETE (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
   [2018-03-15 11:54:05,247] DEBUG Initiating API versions fetch from node 53. (org.apache.kafka.clients.NetworkClient)
   [2018-03-15 11:54:05,253] DEBUG Recorded API versions for node 53: (Produce(0): 0 to 2
[usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 to 1 [usable: 1], Metadata(3): 0
to 2 [usable: 2], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6):
0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 [usable:
2], OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 [usable: 0], JoinGroup(11):
0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], LeaveGroup(13): 0 [usable: 0], SyncGroup(14):
0 [usable: 0], DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], SaslHandshake(17):
0 [usable: 0], ApiVersions(18): 0 [usable: 0], CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20):
0 [usable: 0]) (org.apache.kafka.clients.NetworkClient)
   [2018-03-15 11:54:05,315] DEBUG Handling ListOffsetResponse response for org.matt_test2-0.

   ```
   Fetched offset -1, timestamp -1 (org.apache.kafka.clients.consumer.internals.Fetcher)
   From the log, we find broker return the offset, but it's value is -1, this value will be
removed in Fetcher.handleListOffsetResponse()
   
   ```java
   // Handle v1 and later response
   log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",
           topicPartition, partitionData.offset, partitionData.timestamp);
   if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
       OffsetData offsetData = new OffsetData(partitionData.offset, partitionData.timestamp);
       timestampOffsetMap.put(topicPartition, offsetData);
   }
   ```
   
   We test several situations, and we found that in the following two cases it will return
none.
   
   1. The topic-partition msg number is 0, when we use offsetsForTimes() to get the offset,
the offset will retuan -1;
   2. The targetTime we use to find offset is larger than the partition active_segment's largestTimestamp,
the offset will return -1;
   
   If the offset is set -1, it will not be return to consumer client. I think in these situation,
it should be return the latest offset, and it's also defined in kafka/core annotation.
   
   ```scala
   /**
    * Search the message offset based on timestamp.
    * This method returns an option of TimestampOffset. The offset is the offset of the first
message whose timestamp is
    * greater than or equals to the target timestamp.
    *
    * If all the message in the segment have smaller timestamps, the returned offset will
be last offset + 1 and the
    * timestamp will be max timestamp in the segment.
    *
    * If all the messages in the segment have larger timestamps, or no message in the segment
has a timestamp,
    * the returned the offset will be the base offset of the segment and the timestamp will
be Message.NoTimestamp.
    *
    * This methods only returns None when the log is not empty but we did not see any messages
when scanning the log
    * from the indexed position. This could happen if the log is truncated after we get the
indexed position but
    * before we scan the log from there. In this case we simply return None and the caller
will need to check on
    * the truncated log and maybe retry or even do the search on another log segment.
    *
    * @param timestamp The timestamp to search for.
    * @return the timestamp and offset of the first message whose timestamp is larger than
or equals to the
    *         target timestamp. None will be returned if there is no such message.
    */
   def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = {
     // Get the index entry with a timestamp less than or equal to the target timestamp
     val timestampOffset = timeIndex.lookup(timestamp)
     val position = index.lookup(timestampOffset.offset).position
     // Search the timestamp
     log.searchForTimestamp(timestamp, position)
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Consumer use offsetsForTimes() get offset return None.
> ------------------------------------------------------
>
>                 Key: KAFKA-6662
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6662
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.10.2.0
>            Reporter: Matt Wang
>            Priority: Minor
>
> When we use Consumer's method  offsetsForTimes()  to get the topic-partition offset,
sometimes it will return null. Print the client log
> {code:java}
> // 2018-03-15 11:54:05,239] DEBUG Collector TraceCollector dispatcher loop interval 256
upload 0 retry 0 fail 0 (com.meituan.mtrace.collector.sg.AbstractCollector)
> [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INITIAL (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INTERMEDIATE (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,247] DEBUG Set SASL client state to COMPLETE (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,247] DEBUG Initiating API versions fetch from node 53. (org.apache.kafka.clients.NetworkClient)
> [2018-03-15 11:54:05,253] DEBUG Recorded API versions for node 53: (Produce(0): 0 to
2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 to 1 [usable: 1], Metadata(3):
0 to 2 [usable: 2], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6):
0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 [usable:
2], OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 [usable: 0], JoinGroup(11):
0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], LeaveGroup(13): 0 [usable: 0], SyncGroup(14):
0 [usable: 0], DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], SaslHandshake(17):
0 [usable: 0], ApiVersions(18): 0 [usable: 0], CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20):
0 [usable: 0]) (org.apache.kafka.clients.NetworkClient)
> [2018-03-15 11:54:05,315] DEBUG Handling ListOffsetResponse response for org.matt_test2-0.
Fetched offset -1, timestamp -1 (org.apache.kafka.clients.consumer.internals.Fetcher){code}
> From the log, we find broker return the offset, but it's value is -1, this value will
be removed in Fetcher.handleListOffsetResponse(),
> {code:java}
> // // Handle v1 and later response
> log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp
{}",
>         topicPartition, partitionData.offset, partitionData.timestamp);
> if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
>     OffsetData offsetData = new OffsetData(partitionData.offset, partitionData.timestamp);
>     timestampOffsetMap.put(topicPartition, offsetData);
> }{code}
> We test several situations, and we found that in the following two cases it will return
none.
>  # The topic-partition msg number is 0, when we use offsetsForTimes() to get the offset,
the offset will retuan -1;
>  #  The targetTime we use to find offset is larger than the partition active_segment's
largestTimestamp, the offset will return -1;
> If the offset is set -1, it will not be return to consumer client. I think in these situation,
it should be return the latest offset, and it's also defined in kafka/core annotation.
> {code:java}
> // /**
>  * Search the message offset based on timestamp.
>  * This method returns an option of TimestampOffset. The offset is the offset of the
first message whose timestamp is
>  * greater than or equals to the target timestamp.
>  *
>  * If all the message in the segment have smaller timestamps, the returned offset will
be last offset + 1 and the
>  * timestamp will be max timestamp in the segment.
>  *
>  * If all the messages in the segment have larger timestamps, or no message in the segment
has a timestamp,
>  * the returned the offset will be the base offset of the segment and the timestamp will
be Message.NoTimestamp.
>  *
>  * This methods only returns None when the log is not empty but we did not see any messages
when scanning the log
>  * from the indexed position. This could happen if the log is truncated after we get
the indexed position but
>  * before we scan the log from there. In this case we simply return None and the caller
will need to check on
>  * the truncated log and maybe retry or even do the search on another log segment.
>  *
>  * @param timestamp The timestamp to search for.
>  * @return the timestamp and offset of the first message whose timestamp is larger than
or equals to the
>  *         target timestamp. None will be returned if there is no such message.
>  */
> def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = {
>   // Get the index entry with a timestamp less than or equal to the target timestamp
>   val timestampOffset = timeIndex.lookup(timestamp)
>   val position = index.lookup(timestampOffset.offset).position
>   // Search the timestamp
>   log.searchForTimestamp(timestamp, position)
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message