kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6662) Consumer use offsetsForTimes() get offset return None.
Date Wed, 21 Mar 2018 15:06:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16408054#comment-16408054
] 

ASF GitHub Bot commented on KAFKA-6662:
---------------------------------------

wangzzu closed pull request #4717: KAFKA-6662: Consumer use offsetsForTimes() get offset return
None.
URL: https://github.com/apache/kafka/pull/4717
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index f0050f54aef..33035f9a9c9 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1120,7 +1120,10 @@ class Log(@volatile var dir: File,
           None
       }
 
-      targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, logStartOffset))
+      targetSeg match {
+        case None => Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, this.logEndOffset))
+        case _ => targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp))
+      }
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 5970f42f6d9..52740d49c79 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -479,8 +479,12 @@ class LogSegment private[log] (val log: FileRecords,
     val position = offsetIndex.lookup(math.max(timestampOffset.offset, startingOffset)).position
 
     // Search the timestamp
-    Option(log.searchForTimestamp(timestamp, position, startingOffset)).map { timestampAndOffset
=>
-      TimestampOffset(timestampAndOffset.timestamp, timestampAndOffset.offset)
+    if (position == 0 && timestampOffset.timestamp == -1){
+      Option(timestampOffset)
+    } else {
+      Option(log.searchForTimestamp(timestamp, position, startingOffset)).map { timestampAndOffset
=>
+        TimestampOffset(timestampAndOffset.timestamp, timestampAndOffset.offset)
+      }
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index c45ed0d2986..946ef91c3de 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -263,7 +263,7 @@ class LogSegmentTest {
     assertEquals(43, seg.findOffsetByTimestamp(430).get.offset)
     assertEquals(44, seg.findOffsetByTimestamp(431).get.offset)
     // Search beyond the last timestamp
-    assertEquals(None, seg.findOffsetByTimestamp(491))
+    assertEquals(49, seg.findOffsetByTimestamp(491).get.offset)
     // Search before the first indexed timestamp
     assertEquals(41, seg.findOffsetByTimestamp(401).get.offset)
     // Search before the first timestamp


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Consumer use offsetsForTimes() get offset return None.
> ------------------------------------------------------
>
>                 Key: KAFKA-6662
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6662
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.10.2.0
>            Reporter: Matt Wang
>            Priority: Minor
>
> When we use Consumer's method  offsetsForTimes()  to get the topic-partition offset,
sometimes it will return null. Print the client log
> {code:java}
> // 2018-03-15 11:54:05,239] DEBUG Collector TraceCollector dispatcher loop interval 256
upload 0 retry 0 fail 0 (com.meituan.mtrace.collector.sg.AbstractCollector)
> [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INITIAL (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INTERMEDIATE (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,247] DEBUG Set SASL client state to COMPLETE (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,247] DEBUG Initiating API versions fetch from node 53. (org.apache.kafka.clients.NetworkClient)
> [2018-03-15 11:54:05,253] DEBUG Recorded API versions for node 53: (Produce(0): 0 to
2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 to 1 [usable: 1], Metadata(3):
0 to 2 [usable: 2], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6):
0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 [usable:
2], OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 [usable: 0], JoinGroup(11):
0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], LeaveGroup(13): 0 [usable: 0], SyncGroup(14):
0 [usable: 0], DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], SaslHandshake(17):
0 [usable: 0], ApiVersions(18): 0 [usable: 0], CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20):
0 [usable: 0]) (org.apache.kafka.clients.NetworkClient)
> [2018-03-15 11:54:05,315] DEBUG Handling ListOffsetResponse response for org.matt_test2-0.
Fetched offset -1, timestamp -1 (org.apache.kafka.clients.consumer.internals.Fetcher){code}
> From the log, we find broker return the offset, but it's value is -1, this value will
be removed in Fetcher.handleListOffsetResponse(),
> {code:java}
> // // Handle v1 and later response
> log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp
{}",
>         topicPartition, partitionData.offset, partitionData.timestamp);
> if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
>     OffsetData offsetData = new OffsetData(partitionData.offset, partitionData.timestamp);
>     timestampOffsetMap.put(topicPartition, offsetData);
> }{code}
> We test several situations, and we found that in the following two cases it will return
none.
>  # The topic-partition msg number is 0, when we use offsetsForTimes() to get the offset,
the offset will retuan -1;
>  #  The targetTime we use to find offset is larger than the partition active_segment's
largestTimestamp, the offset will return -1;
> If the offset is set -1, it will not be return to consumer client. I think in these situation,
it should be return the latest offset, and it's also defined in kafka/core annotation.
> {code:java}
> // /**
>  * Search the message offset based on timestamp.
>  * This method returns an option of TimestampOffset. The offset is the offset of the
first message whose timestamp is
>  * greater than or equals to the target timestamp.
>  *
>  * If all the message in the segment have smaller timestamps, the returned offset will
be last offset + 1 and the
>  * timestamp will be max timestamp in the segment.
>  *
>  * If all the messages in the segment have larger timestamps, or no message in the segment
has a timestamp,
>  * the returned the offset will be the base offset of the segment and the timestamp will
be Message.NoTimestamp.
>  *
>  * This methods only returns None when the log is not empty but we did not see any messages
when scanning the log
>  * from the indexed position. This could happen if the log is truncated after we get
the indexed position but
>  * before we scan the log from there. In this case we simply return None and the caller
will need to check on
>  * the truncated log and maybe retry or even do the search on another log segment.
>  *
>  * @param timestamp The timestamp to search for.
>  * @return the timestamp and offset of the first message whose timestamp is larger than
or equals to the
>  *         target timestamp. None will be returned if there is no such message.
>  */
> def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = {
>   // Get the index entry with a timestamp less than or equal to the target timestamp
>   val timestampOffset = timeIndex.lookup(timestamp)
>   val position = index.lookup(timestampOffset.offset).position
>   // Search the timestamp
>   log.searchForTimestamp(timestamp, position)
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message