kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Shawn Nguyen (JIRA)" <j...@apache.org>
Subject [jira] [Created] (KAFKA-7110) Windowed changelog keys not deserialized properly by TimeWindowedSerde
Date Wed, 27 Jun 2018 19:23:00 GMT
Shawn Nguyen created KAFKA-7110:

             Summary: Windowed changelog keys not deserialized properly by TimeWindowedSerde
                 Key: KAFKA-7110
                 URL: https://issues.apache.org/jira/browse/KAFKA-7110
             Project: Kafka
          Issue Type: Bug
            Reporter: Shawn Nguyen
             Fix For: 1.1.0

Currently the TimeWindowedSerde does not deserialize the windowed keys from a changelog topic
properly. There are a few assumptions made in the TimeWindowedDeserializer that prevents the
changelog windowed keys from being correctly deserialized.

1) In the from method of WindowKeySchema (called in deserialize in TimeWindowedDeserializer),
we extract the window from the binary key, but we call getLong(binaryKey.length -TIMESTAMP_SIZE).
However, the changelog for ChangeLoggingWindowBytesStore will log the windowed key as:

changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, timestamp, maybeUpdateSeqnumForDups()),

In toStoreKeyBinary, we store the key in 
final ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + SEQNUM_SIZE);
with the seqnum (used for de-duping). So the eventual result is that when we deserialize,
we do not assume the windowed changelog key has a seq_num, and the window extracted will be
gibberish values since the bytes extracted won't be alligned.

The fix here is to introduce a new Serde in WindowSerdes that will handle explicitly, windowed
changelog input topic. 


2) In the constructor of TimeWindowedDeserializer, the windowSize is fixed to Long.MAX_VALUE:

// TODO: fix this part as last bits of KAFKA-4468 public TimeWindowedDeserializer(final Deserializer<T>
inner) { this(inner, Long.MAX_VALUE); } 
public TimeWindowedDeserializer(final Deserializer<T> inner, final long windowSize)
{ this.inner = inner; this.windowSize = windowSize; }
This will cause the end times to be giberrish when we extract the window since the windowSize
is subtracted from the start time in:

public static <K> Windowed<K> from(final byte[] binaryKey, final long windowSize,
final Deserializer<K> deserializer, final String topic) { final byte[] bytes = new byte[binaryKey.length
- TIMESTAMP_SIZE]; System.arraycopy(binaryKey, 0, bytes, 0, bytes.length); final K key = deserializer.deserialize(topic,
bytes); final Window window = extractWindow(binaryKey, windowSize); return new Windowed<>(key,
window); } 
private static Window extractWindow(final byte[] binaryKey, final long windowSize) { final
ByteBuffer buffer = ByteBuffer.wrap(binaryKey); final long start = buffer.getLong(binaryKey.length
- TIMESTAMP_SIZE); return timeWindowForSize(start, windowSize); }
So in the new serde, we will make windowSize a constructor param that can be supplied.



This message was sent by Atlassian JIRA

View raw message