kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: MINOR: Add missing generics and surpress warning annotations (#4518)
Date Thu, 08 Feb 2018 21:22:09 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a1352f8  MINOR: Add missing generics and surpress warning annotations (#4518)
a1352f8 is described below

commit a1352f8c5a96c8e861c17c8105557428a47f4334
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Thu Feb 8 13:21:56 2018 -0800

    MINOR: Add missing generics and surpress warning annotations (#4518)
    
    Author: Matthias J. Sax <matthias@confluent.io>
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 .../java/org/apache/kafka/streams/Topology.java    |  18 +-
 .../streams/kstream/internals/AbstractStream.java  |   1 +
 .../kstream/internals/KGroupedTableImpl.java       |   5 +-
 .../streams/state/internals/RocksDBStore.java      | 182 ++++++++++-----------
 .../RocksDbKeyValueBytesStoreSupplier.java         |   5 +-
 .../kafka/streams/state/internals/Segment.java     |   7 +-
 .../org/apache/kafka/streams/TopologyTest.java     |   2 +-
 .../streams/state/internals/RocksDBStoreTest.java  | 128 +++++++++++----
 .../apache/kafka/streams/TopologyTestDriver.java   |   4 +-
 .../kafka/streams/test/ConsumerRecordFactory.java  |   2 +-
 .../kafka/streams/TopologyTestDriverTest.java      |   1 +
 11 files changed, 200 insertions(+), 155 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 3b1ac6d..c137a30 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -448,10 +448,10 @@ public class Topology {
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      */
-    public synchronized Topology addSink(final String name,
-                                         final String topic,
-                                         final StreamPartitioner partitioner,
-                                         final String... parentNames) {
+    public synchronized <K, V> Topology addSink(final String name,
+                                                final String topic,
+                                                final StreamPartitioner<? super K, ? super
V> partitioner,
+                                                final String... parentNames) {
         internalTopologyBuilder.addSink(name, topic, null, null, partitioner, parentNames);
         return this;
     }
@@ -476,11 +476,11 @@ public class Topology {
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      */
-    public synchronized Topology addSink(final String name,
-                                         final String topic,
-                                         final Serializer keySerializer,
-                                         final Serializer valueSerializer,
-                                         final String... parentNames) {
+    public synchronized <K, V> Topology addSink(final String name,
+                                                final String topic,
+                                                final Serializer<K> keySerializer,
+                                                final Serializer<V> valueSerializer,
+                                                final String... parentNames) {
         internalTopologyBuilder.addSink(name, topic, keySerializer, valueSerializer, null,
parentNames);
         return this;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 41cdec2..7410a0a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -136,6 +136,7 @@ public abstract class AbstractStream<K> {
             public InternalValueTransformerWithKey<K, V, VR> get() {
                 final ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
                 return new InternalValueTransformerWithKey<K, V, VR>() {
+                    @SuppressWarnings("deprecation")
                     @Override
                     public VR punctuate(final long timestamp) {
                         return valueTransformer.punctuate(timestamp);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index d5a4e71..6e33251 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -29,7 +29,6 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.Collections;
@@ -49,7 +48,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K>
implements KGroup
 
     protected final Serde<K> keySerde;
     protected final Serde<V> valSerde;
-    private boolean isQueryable = true;
+    private boolean isQueryable;
     private final Initializer<Long> countInitializer = new Initializer<Long>()
{
         @Override
         public Long apply() {
@@ -142,7 +141,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K>
implements KGroup
     @SuppressWarnings("deprecation")
     private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>>
aggregateSupplier,
                                          final String functionName,
-                                         final StateStoreSupplier<KeyValueStore> storeSupplier)
{
+                                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore>
storeSupplier) {
         final String sinkName = builder.newProcessorName(KStreamImpl.SINK_NAME);
         final String sourceName = builder.newProcessorName(KStreamImpl.SOURCE_NAME);
         final String funcName = builder.newProcessorName(functionName);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 67ec915..a2e45e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
@@ -28,11 +27,9 @@ import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallba
 import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
-import org.apache.kafka.streams.state.StateSerdes;
 import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.CompactionStyle;
 import org.rocksdb.CompressionType;
@@ -66,12 +63,9 @@ import java.util.Set;
  * If you intend to work on byte arrays as key, for example, you may want to wrap them with
the {@code Bytes} class,
  * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[],
...>}.
  *
- * @param <K> The key type
- * @param <V> The value type
- *
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
-public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
+public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
 
     private static final int TTL_NOT_USED = -1;
 
@@ -89,10 +83,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
     private final Set<KeyValueIterator> openIterators = Collections.synchronizedSet(new
HashSet<KeyValueIterator>());
 
     File dbDir;
-    private StateSerdes<K, V> serdes;
-    private final Serde<K> keySerde;
-    private final Serde<V> valueSerde;
-
     private RocksDB db;
 
     // the following option objects will be created in the constructor and closed in the
close() method
@@ -107,19 +97,17 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
 
     protected volatile boolean open = false;
 
-    RocksDBStore(String name, Serde<K> keySerde, Serde<V> valueSerde) {
-        this(name, DB_FILE_DIR, keySerde, valueSerde);
+    RocksDBStore(String name) {
+        this(name, DB_FILE_DIR);
     }
 
-    RocksDBStore(String name, String parentDir, Serde<K> keySerde, Serde<V> valueSerde)
{
+    RocksDBStore(String name, String parentDir) {
         this.name = name;
         this.parentDir = parentDir;
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
     }
 
     @SuppressWarnings("unchecked")
-    public void openDB(ProcessorContext context) {
+    public void openDB(final ProcessorContext context) {
         // initialize the default rocksdb options
         final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
         tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
@@ -161,13 +149,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
             final RocksDBConfigSetter configSetter = Utils.newInstance(configSetterClass);
             configSetter.setConfig(name, options, configs);
         }
-        // we need to construct the serde while opening DB since
-        // it is also triggered by windowed DB segments without initialization
-        this.serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
-
         this.dbDir = new File(new File(context.stateDir(), parentDir), this.name);
 
         try {
@@ -179,7 +160,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
         open = true;
     }
 
-    public void init(ProcessorContext context, StateStore root) {
+    public void init(final ProcessorContext context,
+                     final StateStore root) {
         // open the DB dir
         this.internalProcessorContext = context;
         openDB(context);
@@ -190,7 +172,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
         context.register(root, false, this.batchingStateRestoreCallback);
     }
 
-    private RocksDB openDB(File dir, Options options, int ttl) throws IOException {
+    private RocksDB openDB(final File dir,
+                           final Options options,
+                           final int ttl) throws IOException {
         try {
             if (ttl == TTL_NOT_USED) {
                 Files.createDirectories(dir.getParentFile().toPath());
@@ -200,7 +184,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
                 // TODO: support TTL with change log?
                 // return TtlDB.open(options, dir.toString(), ttl, false);
             }
-        } catch (RocksDBException e) {
+        } catch (final RocksDBException e) {
             throw new ProcessorStateException("Error opening store " + this.name + " at location
" + dir.toString(), e);
         }
     }
@@ -226,10 +210,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
     }
 
     @Override
-    public synchronized V get(K key) {
+    public synchronized byte[] get(final Bytes key) {
         validateStoreOpen();
-        byte[] byteValue = getInternal(serdes.rawKey(key));
-        return byteValue  == null ? null : serdes.valueFrom(byteValue);
+        return getInternal(key.get());
     }
 
     private void validateStoreOpen() {
@@ -238,23 +221,22 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
         }
     }
 
-    private byte[] getInternal(byte[] rawKey) {
+    private byte[] getInternal(final byte[] rawKey) {
         try {
             return this.db.get(rawKey);
-        } catch (RocksDBException e) {
-            throw new ProcessorStateException("Error while getting value for key " + serdes.keyFrom(rawKey)
+
-                    " from store " + this.name, e);
+        } catch (final RocksDBException e) {
+            throw new ProcessorStateException("Error while getting value for key from store
" + this.name, e);
         }
     }
 
-    private void toggleDbForBulkLoading(boolean prepareForBulkload) {
+    private void toggleDbForBulkLoading(final boolean prepareForBulkload) {
 
         if (prepareForBulkload) {
             // if the store is not empty, we need to compact to get around the num.levels
check
             // for bulk loading
             final String[] sstFileNames = dbDir.list(new FilenameFilter() {
                 @Override
-                public boolean accept(File dir, String name) {
+                public boolean accept(final File dir, final String name) {
                     return name.matches(".*\\.sst");
                 }
             });
@@ -262,7 +244,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
             if (sstFileNames != null && sstFileNames.length > 0) {
                 try {
                     this.db.compactRange(true, 1, 0);
-                } catch (RocksDBException e) {
+                } catch (final RocksDBException e) {
                     throw new ProcessorStateException("Error while range compacting during
restoring  store " + this.name, e);
                 }
 
@@ -280,27 +262,27 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
 
     @SuppressWarnings("unchecked")
     @Override
-    public synchronized void put(K key, V value) {
+    public synchronized void put(final Bytes key,
+                                 final byte[] value) {
         Objects.requireNonNull(key, "key cannot be null");
         validateStoreOpen();
-        byte[] rawKey = serdes.rawKey(key);
-        byte[] rawValue = serdes.rawValue(value);
-        putInternal(rawKey, rawValue);
+        putInternal(key.get(), value);
     }
 
     @Override
-    public synchronized V putIfAbsent(K key, V value) {
+    public synchronized byte[] putIfAbsent(final Bytes key,
+                                           final byte[] value) {
         Objects.requireNonNull(key, "key cannot be null");
-        V originalValue = get(key);
+        final byte[] originalValue = get(key);
         if (originalValue == null) {
             put(key, value);
         }
         return originalValue;
     }
 
-    private void restoreAllInternal(Collection<KeyValue<byte[], byte[]>> records)
{
-        try (WriteBatch batch = new WriteBatch()) {
-            for (KeyValue<byte[], byte[]> record : records) {
+    private void restoreAllInternal(final Collection<KeyValue<byte[], byte[]>>
records) {
+        try (final WriteBatch batch = new WriteBatch()) {
+            for (final KeyValue<byte[], byte[]> record : records) {
                 if (record.value == null) {
                     batch.remove(record.key);
                 } else {
@@ -308,98 +290,96 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
                 }
             }
             db.write(wOptions, batch);
-        } catch (RocksDBException e) {
+        } catch (final RocksDBException e) {
             throw new ProcessorStateException("Error restoring batch to store " + this.name,
e);
         }
     }
 
-    private void putInternal(byte[] rawKey, byte[] rawValue) {
+    private void putInternal(final byte[] rawKey,
+                             final byte[] rawValue) {
         if (rawValue == null) {
             try {
                 db.delete(wOptions, rawKey);
-            } catch (RocksDBException e) {
-                throw new ProcessorStateException("Error while removing key " + serdes.keyFrom(rawKey)
+
-                        " from store " + this.name, e);
+            } catch (final RocksDBException e) {
+                throw new ProcessorStateException("Error while removing key from store "
+ this.name, e);
             }
         } else {
             try {
                 db.put(wOptions, rawKey, rawValue);
-            } catch (RocksDBException e) {
-                throw new ProcessorStateException("Error while executing put key " + serdes.keyFrom(rawKey)
+
-                        " and value " + serdes.keyFrom(rawValue) + " from store " + this.name,
e);
+            } catch (final RocksDBException e) {
+                throw new ProcessorStateException("Error while executing putting key/value
into store " + this.name, e);
             }
         }
     }
 
     @Override
-    public void putAll(List<KeyValue<K, V>> entries) {
-        try (WriteBatch batch = new WriteBatch()) {
-            for (KeyValue<K, V> entry : entries) {
+    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        try (final WriteBatch batch = new WriteBatch()) {
+            for (final KeyValue<Bytes, byte[]> entry : entries) {
                 Objects.requireNonNull(entry.key, "key cannot be null");
-                final byte[] rawKey = serdes.rawKey(entry.key);
-                final byte[] rawValue = serdes.rawValue(entry.value);
-                if (rawValue == null) {
-                    batch.remove(rawKey);
+                if (entry.value == null) {
+                    batch.remove(entry.key.get());
                 } else {
-                    batch.put(rawKey, rawValue);
+                    batch.put(entry.key.get(), entry.value);
                 }
             }
             db.write(wOptions, batch);
-        } catch (RocksDBException e) {
+        } catch (final RocksDBException e) {
             throw new ProcessorStateException("Error while batch writing to store " + this.name,
e);
         }
 
     }
 
     @Override
-    public synchronized V delete(K key) {
+    public synchronized byte[] delete(final Bytes key) {
         Objects.requireNonNull(key, "key cannot be null");
-        V value = get(key);
+        final byte[] value = get(key);
         put(key, null);
         return value;
     }
 
     @Override
-    public synchronized KeyValueIterator<K, V> range(K from, K to) {
+    public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+                                                              final Bytes to) {
         Objects.requireNonNull(from, "from cannot be null");
         Objects.requireNonNull(to, "to cannot be null");
         validateStoreOpen();
 
         // query rocksdb
-        final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(name,
db.newIterator(), serdes, from, to);
+        final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(name,
db.newIterator(), from, to);
         openIterators.add(rocksDBRangeIterator);
 
         return rocksDBRangeIterator;
     }
 
     @Override
-    public synchronized KeyValueIterator<K, V> all() {
+    public synchronized KeyValueIterator<Bytes, byte[]> all() {
         validateStoreOpen();
         // query rocksdb
-        RocksIterator innerIter = db.newIterator();
+        final RocksIterator innerIter = db.newIterator();
         innerIter.seekToFirst();
-        final RocksDbIterator rocksDbIterator = new RocksDbIterator(name, innerIter, serdes);
+        final RocksDbIterator rocksDbIterator = new RocksDbIterator(name, innerIter);
         openIterators.add(rocksDbIterator);
         return rocksDbIterator;
     }
 
-    public synchronized KeyValue<K, V> first() {
+    public synchronized KeyValue<Bytes, byte[]> first() {
         validateStoreOpen();
-        
-        RocksIterator innerIter = db.newIterator();
+
+        final RocksIterator innerIter = db.newIterator();
         innerIter.seekToFirst();
-        KeyValue<K, V> pair = new KeyValue<>(serdes.keyFrom(innerIter.key()),
serdes.valueFrom(innerIter.value()));
+        final KeyValue<Bytes, byte[]> pair = new KeyValue<>(new Bytes(innerIter.key()),
innerIter.value());
         innerIter.close();
 
         return pair;
     }
 
-    public synchronized KeyValue<K, V> last() {
+    public synchronized KeyValue<Bytes, byte[]> last() {
         validateStoreOpen();
-        
-        RocksIterator innerIter = db.newIterator();
+
+        final RocksIterator innerIter = db.newIterator();
         innerIter.seekToLast();
-        KeyValue<K, V> pair = new KeyValue<>(serdes.keyFrom(innerIter.key()),
serdes.valueFrom(innerIter.value()));
+        final KeyValue<Bytes, byte[]> pair = new KeyValue<>(new Bytes(innerIter.key()),
innerIter.value());
         innerIter.close();
 
         return pair;
@@ -419,10 +399,10 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
     @Override
     public long approximateNumEntries() {
         validateStoreOpen();
-        long value;
+        final long value;
         try {
             value = this.db.getLongProperty("rocksdb.estimate-num-keys");
-        } catch (RocksDBException e) {
+        } catch (final RocksDBException e) {
             throw new ProcessorStateException("Error fetching property from store " + this.name,
e);
         }
         if (isOverflowing(value)) {
@@ -431,7 +411,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
         return value;
     }
 
-    private boolean isOverflowing(long value) {
+    private boolean isOverflowing(final long value) {
         // RocksDB returns an unsigned 8-byte integer, which could overflow long
         // and manifest as a negative value.
         return value < 0;
@@ -451,7 +431,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
     private void flushInternal() {
         try {
             db.flush(fOptions);
-        } catch (RocksDBException e) {
+        } catch (final RocksDBException e) {
             throw new ProcessorStateException("Error while executing flush from store " +
this.name, e);
         }
     }
@@ -476,25 +456,24 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
     }
 
     private void closeOpenIterators() {
-        HashSet<KeyValueIterator> iterators;
+        final HashSet<KeyValueIterator> iterators;
         synchronized (openIterators) {
             iterators = new HashSet<>(openIterators);
         }
-        for (KeyValueIterator iterator : iterators) {
+        for (final KeyValueIterator iterator : iterators) {
             iterator.close();
         }
     }
 
-    private class RocksDbIterator implements KeyValueIterator<K, V> {
+    private class RocksDbIterator implements KeyValueIterator<Bytes, byte[]> {
         private final String storeName;
         private final RocksIterator iter;
-        private final StateSerdes<K, V> serdes;
 
         private volatile boolean open = true;
 
-        RocksDbIterator(String storeName, RocksIterator iter, StateSerdes<K, V> serdes)
{
+        RocksDbIterator(final String storeName,
+                        final RocksIterator iter) {
             this.iter = iter;
-            this.serdes = serdes;
             this.storeName = storeName;
         }
 
@@ -502,8 +481,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
             return iter.key();
         }
 
-        private KeyValue<K, V> getKeyValue() {
-            return new KeyValue<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
+        private KeyValue<Bytes, byte[]> getKeyValue() {
+            return new KeyValue<>(new Bytes(iter.key()), iter.value());
         }
 
         @Override
@@ -519,11 +498,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
          * @throws NoSuchElementException if no next element exist
          */
         @Override
-        public synchronized KeyValue<K, V> next() {
+        public synchronized KeyValue<Bytes, byte[]> next() {
             if (!hasNext())
                 throw new NoSuchElementException();
 
-            KeyValue<K, V> entry = this.getKeyValue();
+            final KeyValue<Bytes, byte[]> entry = this.getKeyValue();
             iter.next();
             return entry;
         }
@@ -541,11 +520,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
         }
 
         @Override
-        public K peekNextKey() {
+        public Bytes peekNextKey() {
             if (!hasNext()) {
                 throw new NoSuchElementException();
             }
-            return serdes.keyFrom(iter.key());
+            return new Bytes(iter.key());
         }
     }
 
@@ -554,12 +533,15 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
         // comparator to be pluggable, and the default is lexicographic, so it's
         // safe to just force lexicographic comparator here for now.
         private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
-        private byte[] rawToKey;
-
-        RocksDBRangeIterator(String storeName, RocksIterator iter, StateSerdes<K, V>
serdes, K from, K to) {
-            super(storeName, iter, serdes);
-            iter.seek(serdes.rawKey(from));
-            this.rawToKey = serdes.rawKey(to);
+        private final byte[] rawToKey;
+
+        RocksDBRangeIterator(final String storeName,
+                             final RocksIterator iter,
+                             final Bytes from,
+                             final Bytes to) {
+            super(storeName, iter);
+            iter.seek(from.get());
+            this.rawToKey = to.get();
             if (this.rawToKey == null) {
                 throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for
key " + to);
             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java
index 7870579..b0ad619 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -36,9 +35,7 @@ public class RocksDbKeyValueBytesStoreSupplier implements KeyValueBytesStoreSupp
 
     @Override
     public KeyValueStore<Bytes, byte[]> get() {
-        return new RocksDBStore<>(name,
-                                  Serdes.Bytes(),
-                                  Serdes.ByteArray());
+        return new RocksDBStore(name);
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index 50c1547..7b2b803 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -16,18 +16,16 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
 import java.io.IOException;
 
-// Use the Bytes wrapper for underlying rocksDB keys since they are used for hashing data
structures
-class Segment extends RocksDBStore<Bytes, byte[]> implements Comparable<Segment>
{
+class Segment extends RocksDBStore implements Comparable<Segment> {
     public final long id;
 
     Segment(String segmentName, String windowName, long id) {
-        super(segmentName, windowName, WindowStoreUtils.INNER_KEY_SERDE, WindowStoreUtils.INNER_VALUE_SERDE);
+        super(segmentName, windowName);
         this.id = id;
     }
 
@@ -43,7 +41,6 @@ class Segment extends RocksDBStore<Bytes, byte[]> implements Comparable<Segment>
     @Override
     public void openDB(final ProcessorContext context) {
         super.openDB(context);
-
         // skip the registering step
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 0a45803..992ffd8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -157,7 +157,7 @@ public class TopologyTest {
     }
 
     @Test
-    public void shoudNotAllowToAddProcessorWithSameName() {
+    public void shouldNotAllowToAddProcessorWithSameName() {
         topology.addSource("source", "topic-1");
         topology.addProcessor("processor", new MockProcessorSupplier(), "source");
         try {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 1f15a03..49e893b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -17,7 +17,12 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
@@ -54,13 +59,15 @@ import static org.junit.Assert.fail;
 public class RocksDBStoreTest {
     private final File tempDir = TestUtils.tempDirectory();
 
-    private RocksDBStore<String, String> subject;
+    private Serializer<String> stringSerializer = new StringSerializer();
+    private Deserializer<String> stringDeserializer = new StringDeserializer();
+    private RocksDBStore subject;
     private MockProcessorContext context;
     private File dir;
 
     @Before
     public void setUp() {
-        subject = new RocksDBStore<>("test", Serdes.String(), Serdes.String());
+        subject = new RocksDBStore("test");
         dir = TestUtils.tempDirectory();
         context = new MockProcessorContext(dir,
             Serdes.String(),
@@ -81,7 +88,8 @@ public class RocksDBStoreTest {
         final String message = "how can a 4 ounce bird carry a 2lb coconut";
         int intKey = 1;
         for (int i = 0; i < 2000000; i++) {
-            subject.put("theKeyIs" + intKey++, message);
+            subject.put(new Bytes(stringSerializer.serialize(null, "theKeyIs" + intKey++)),
+                stringSerializer.serialize(null, message));
         }
 
         final List<KeyValue<byte[], byte[]>> restoreBytes = new ArrayList<>();
@@ -92,7 +100,11 @@ public class RocksDBStoreTest {
 
         context.restore("test", restoreBytes);
 
-        assertThat(subject.get("restoredKey"), equalTo("restoredValue"));
+        assertThat(
+            stringDeserializer.deserialize(
+                null,
+                subject.get(new Bytes(stringSerializer.serialize(null, "restoredKey")))),
+            equalTo("restoredValue"));
     }
 
     @Test
@@ -122,18 +134,36 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldPutAll() {
-        List<KeyValue<String, String>> entries = new ArrayList<>();
-        entries.add(new KeyValue<>("1", "a"));
-        entries.add(new KeyValue<>("2", "b"));
-        entries.add(new KeyValue<>("3", "c"));
+        List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "1")),
+            stringSerializer.serialize(null, "a")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "2")),
+            stringSerializer.serialize(null, "b")));
+        entries.add(new KeyValue<>(
+            new Bytes(stringSerializer.serialize(null, "3")),
+            stringSerializer.serialize(null, "c")));
 
         subject.init(context, subject);
         subject.putAll(entries);
         subject.flush();
 
-        assertEquals(subject.get("1"), "a");
-        assertEquals(subject.get("2"), "b");
-        assertEquals(subject.get("3"), "c");
+        assertEquals(
+            stringDeserializer.deserialize(
+                null,
+                subject.get(new Bytes(stringSerializer.serialize(null, "1")))),
+            "a");
+        assertEquals(
+            stringDeserializer.deserialize(
+                null,
+                subject.get(new Bytes(stringSerializer.serialize(null, "2")))),
+            "b");
+        assertEquals(
+            stringDeserializer.deserialize(
+                null,
+                subject.get(new Bytes(stringSerializer.serialize(null, "3")))),
+            "c");
     }
 
     @Test
@@ -173,9 +203,21 @@ public class RocksDBStoreTest {
         subject.init(context, subject);
         context.restore(subject.name(), entries);
 
-        assertEquals(subject.get("1"), "a");
-        assertEquals(subject.get("2"), "b");
-        assertEquals(subject.get("3"), "c");
+        assertEquals(
+            stringDeserializer.deserialize(
+                null,
+                subject.get(new Bytes(stringSerializer.serialize(null, "1")))),
+            "a");
+        assertEquals(
+            stringDeserializer.deserialize(
+                null,
+                subject.get(new Bytes(stringSerializer.serialize(null, "2")))),
+            "b");
+        assertEquals(
+            stringDeserializer.deserialize(
+                null,
+                subject.get(new Bytes(stringSerializer.serialize(null, "3")))),
+            "c");
     }
 
 
@@ -187,11 +229,11 @@ public class RocksDBStoreTest {
         subject.init(context, subject);
         context.restore(subject.name(), entries);
 
-        final KeyValueIterator<String, String> iterator = subject.all();
+        final KeyValueIterator<Bytes, byte[]> iterator = subject.all();
         final Set<String> keys = new HashSet<>();
 
         while (iterator.hasNext()) {
-            keys.add(iterator.next().key);
+            keys.add(stringDeserializer.deserialize(null, iterator.next().key.get()));
         }
 
         assertThat(keys, equalTo(Utils.mkSet("2", "3")));
@@ -211,18 +253,30 @@ public class RocksDBStoreTest {
         subject.init(context, subject);
         context.restore(subject.name(), entries);
 
-        final KeyValueIterator<String, String> iterator = subject.all();
+        final KeyValueIterator<Bytes, byte[]> iterator = subject.all();
         final Set<String> keys = new HashSet<>();
 
         while (iterator.hasNext()) {
-            keys.add(iterator.next().key);
+            keys.add(stringDeserializer.deserialize(null, iterator.next().key.get()));
         }
 
         assertThat(keys, equalTo(Utils.mkSet("1", "2", "3")));
 
-        assertEquals(subject.get("1"), "restored");
-        assertEquals(subject.get("2"), "b");
-        assertEquals(subject.get("3"), "c");
+        assertEquals(
+            stringDeserializer.deserialize(
+                null,
+                subject.get(new Bytes(stringSerializer.serialize(null, "1")))),
+            "restored");
+        assertEquals(
+            stringDeserializer.deserialize(
+                null,
+                subject.get(new Bytes(stringSerializer.serialize(null, "2")))),
+            "b");
+        assertEquals(
+            stringDeserializer.deserialize(
+                null,
+                subject.get(new Bytes(stringSerializer.serialize(null, "3")))),
+            "c");
     }
 
     @Test
@@ -233,9 +287,21 @@ public class RocksDBStoreTest {
 
         context.restore(subject.name(), entries);
 
-        assertEquals(subject.get("1"), "a");
-        assertEquals(subject.get("2"), "b");
-        assertEquals(subject.get("3"), "c");
+        assertEquals(
+            stringDeserializer.deserialize(
+                null,
+                subject.get(new Bytes(stringSerializer.serialize(null, "1")))),
+            "a");
+        assertEquals(
+            stringDeserializer.deserialize(
+                null,
+                subject.get(new Bytes(stringSerializer.serialize(null, "2")))),
+            "b");
+        assertEquals(
+            stringDeserializer.deserialize(
+                null,
+                subject.get(new Bytes(stringSerializer.serialize(null, "3")))),
+            "c");
 
         entries.clear();
 
@@ -245,11 +311,11 @@ public class RocksDBStoreTest {
 
         context.restore(subject.name(), entries);
 
-        final KeyValueIterator<String, String> iterator = subject.all();
+        final KeyValueIterator<Bytes, byte[]> iterator = subject.all();
         final Set<String> keys = new HashSet<>();
 
         while (iterator.hasNext()) {
-            keys.add(iterator.next().key);
+            keys.add(stringDeserializer.deserialize(null, iterator.next().key.get()));
         }
 
         assertThat(keys, equalTo(Utils.mkSet("2", "3")));
@@ -261,7 +327,7 @@ public class RocksDBStoreTest {
     public void shouldThrowNullPointerExceptionOnNullPut() {
         subject.init(context, subject);
         try {
-            subject.put(null, "someVal");
+            subject.put(null, stringSerializer.serialize(null, "someVal"));
             fail("Should have thrown NullPointerException on null put()");
         } catch (NullPointerException e) { }
     }
@@ -270,7 +336,7 @@ public class RocksDBStoreTest {
     public void shouldThrowNullPointerExceptionOnNullPutAll() {
         subject.init(context, subject);
         try {
-            subject.put(null, "someVal");
+            subject.put(null, stringSerializer.serialize(null, "someVal"));
             fail("Should have thrown NullPointerException on null put()");
         } catch (NullPointerException e) { }
     }
@@ -297,7 +363,7 @@ public class RocksDBStoreTest {
     public void shouldThrowNullPointerExceptionOnRange() {
         subject.init(context, subject);
         try {
-            subject.range(null, "2");
+            subject.range(null, new Bytes(stringSerializer.serialize(null, "2")));
             fail("Should have thrown NullPointerException on deleting null key");
         } catch (NullPointerException e) { }
     }
@@ -306,7 +372,9 @@ public class RocksDBStoreTest {
     public void shouldThrowProcessorStateExceptionOnPutDeletedDir() throws IOException {
         subject.init(context, subject);
         Utils.delete(dir);
-        subject.put("anyKey", "anyValue");
+        subject.put(
+            new Bytes(stringSerializer.serialize(null, "anyKey")),
+            stringSerializer.serialize(null, "anyValue"));
         subject.flush();
     }
 
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 6168640..ff63554 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -323,7 +323,7 @@ public class TopologyTestDriver {
                 offset,
                 consumerRecord.timestamp(),
                 consumerRecord.timestampType(),
-                consumerRecord.checksum(),
+                ConsumerRecord.NULL_CHECKSUM,
                 consumerRecord.serializedKeySize(),
                 consumerRecord.serializedValueSize(),
                 consumerRecord.key(),
@@ -376,7 +376,7 @@ public class TopologyTestDriver {
                 offset,
                 consumerRecord.timestamp(),
                 consumerRecord.timestampType(),
-                consumerRecord.checksum(),
+                ConsumerRecord.NULL_CHECKSUM,
                 consumerRecord.serializedKeySize(),
                 consumerRecord.serializedValueSize(),
                 consumerRecord.key(),
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
index ea8d632..b0ccd61 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
@@ -175,7 +175,7 @@ public class ConsumerRecordFactory<K, V> {
             -1L,
             timestampMs,
             TimestampType.CREATE_TIME,
-            0L,
+            ConsumerRecord.NULL_CHECKSUM,
             serializedKey == null ? 0 : serializedKey.length,
             serializedValue == null ? 0 : serializedValue.length,
             serializedKey,
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 5073efd..921f6d6 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -190,6 +190,7 @@ public class TopologyTestDriverTest {
             context.forward(key, value);
         }
 
+        @SuppressWarnings("deprecation")
         @Override
         public void punctuate(long timestamp) {} // deprecated
 

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message