Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BFA18200C67 for ; Mon, 15 May 2017 20:57:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BE1A4160BC2; Mon, 15 May 2017 18:57:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 10D5C160BA9 for ; Mon, 15 May 2017 20:57:39 +0200 (CEST) Received: (qmail 47101 invoked by uid 500); 15 May 2017 18:57:39 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 47092 invoked by uid 99); 15 May 2017 18:57:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 May 2017 18:57:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0C35FDFFAC; Mon, 15 May 2017 18:57:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-5205: Use default values of keySerde if it is not specified by users in CachingSessionStore [Forced Update!] Date: Mon, 15 May 2017 18:57:39 +0000 (UTC) archived-at: Mon, 15 May 2017 18:57:40 -0000 Repository: kafka Updated Branches: refs/heads/0.10.2 9668b6bf6 -> 5719e8c9c (forced update) KAFKA-5205: Use default values of keySerde if it is not specified by users in CachingSessionStore CachingSessionStore wasn't properly using the default keySerde if no Serde was supplied. I saw the below error in the logs for one of my test cases. Author: Kyle Winkelman Reviewers: Damian Guy, Guozhang Wang Closes #2963 from KyleWinkelman/CachingSessionStore-fix-keySerde-use (cherry picked from commit 475cc2544e18b6b321e716691648024cdbbafb16) Signed-off-by: Guozhang Wang Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5719e8c9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5719e8c9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5719e8c9 Branch: refs/heads/0.10.2 Commit: 5719e8c9c07d7580a8806fddbb7135bd56be9e0c Parents: c15b93f Author: Kyle Winkelman Authored: Mon May 15 11:51:10 2017 -0700 Committer: Guozhang Wang Committed: Mon May 15 11:56:40 2017 -0700 ---------------------------------------------------------------------- .../kafka/streams/state/internals/CachingSessionStore.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5719e8c9/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index ed64246..80160b0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -60,7 +60,7 @@ class CachingSessionStore extends WrappedStateStore.AbstractWrappedState final long earliestSessionEndTime, final long latestSessionStartTime) { validateStoreOpen(); - final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(topic, key)); + final Bytes binarySessionId = Bytes.wrap(serdes.rawKey(key)); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, keySchema.lowerRange(binarySessionId, earliestSessionEndTime).get(), @@ -81,7 +81,7 @@ class CachingSessionStore extends WrappedStateStore.AbstractWrappedState public void put(final Windowed key, AGG value) { validateStoreOpen(); - final Bytes binaryKey = SessionKeySerde.toBinary(key, keySerde.serializer(), topic); + final Bytes binaryKey = SessionKeySerde.toBinary(key, serdes.keySerializer(), topic); final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(), key.window().end(), context.partition(), context.topic()); cache.put(cacheName, binaryKey.get(), entry); @@ -127,7 +127,7 @@ class CachingSessionStore extends WrappedStateStore.AbstractWrappedState final RecordContext current = context.recordContext(); context.setRecordContext(entry.recordContext()); try { - final Windowed key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer(), topic); + final Windowed key = SessionKeySerde.from(binaryKey.get(), serdes.keyDeserializer(), topic); if (flushListener != null) { final AGG newValue = serdes.valueFrom(entry.newValue()); final AGG oldValue = fetchPrevious(binaryKey);