kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5576: RocksDB upgrade to 5.8, plus one bug fix on Bytes.wrap
Date Fri, 06 Oct 2017 00:03:04 GMT
Repository: kafka
Updated Branches:
  refs/heads/1.0 42841c9f3 -> b113b4bd3


KAFKA-5576: RocksDB upgrade to 5.8, plus one bug fix on Bytes.wrap

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Bill Bejeck <bill@confluent.io>, Matthias
J. Sax <matthias@confluent.io>, Damian Guy <damian.guy@gmail.com>

Closes #3819 from guozhangwang/KMinor-rocksDB-573

(cherry picked from commit 196bcfca0c56420793f85514d1602bde564b0651)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b113b4bd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b113b4bd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b113b4bd

Branch: refs/heads/1.0
Commit: b113b4bd3e3802e8e6974e6754e96bd0f1de32e2
Parents: 42841c9
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Thu Oct 5 17:02:53 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Oct 5 17:03:01 2017 -0700

----------------------------------------------------------------------
 clients/src/main/java/org/apache/kafka/common/utils/Bytes.java | 2 ++
 gradle/dependencies.gradle                                     | 2 +-
 .../streams/state/internals/ChangeLoggingWindowBytesStore.java | 6 ++----
 .../org/apache/kafka/streams/state/internals/RocksDBStore.java | 6 +-----
 .../streams/state/internals/AbstractKeyValueStoreTest.java     | 1 +
 5 files changed, 7 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b113b4bd/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
index 3044020..e531d1f 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
@@ -33,6 +33,8 @@ public class Bytes implements Comparable<Bytes> {
     private int hashCode;
 
     public static Bytes wrap(byte[] bytes) {
+        if (bytes == null)
+            return null;
         return new Bytes(bytes);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b113b4bd/gradle/dependencies.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index a7c99eb..c9983ce 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -64,7 +64,7 @@ versions += [
   // PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta
   powermock: "2.0.0-beta.5",
   reflections: "0.9.11",
-  rocksDB: "5.3.6",
+  rocksDB: "5.8.0",
   scalatest: "3.0.4",
   scoverage: "1.3.1",
   slf4j: "1.7.25",

http://git-wip-us.apache.org/repos/asf/kafka/blob/b113b4bd/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index da99d55..0035019 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -64,10 +64,8 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore
 
     @Override
     public void put(final Bytes key, final byte[] value, final long timestamp) {
-        if (key != null) {
-            bytesStore.put(key, value, timestamp);
-            changeLogger.logChange(WindowStoreUtils.toBinaryKey(key, timestamp, maybeUpdateSeqnumForDups(),
innerStateSerde), value);
-        }
+        bytesStore.put(key, value, timestamp);
+        changeLogger.logChange(WindowStoreUtils.toBinaryKey(key, timestamp, maybeUpdateSeqnumForDups(),
innerStateSerde), value);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/b113b4bd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index d8a844c..c219314 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -269,11 +269,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
                 } catch (RocksDBException e) {
                     throw new ProcessorStateException("Error while range compacting during
restoring  store " + this.name, e);
                 }
-
-                // we need to re-open with the old num.levels again, this is a workaround
-                // until https://github.com/facebook/rocksdb/pull/2740 is merged in rocksdb
-                close();
-                openDB(internalProcessorContext);
             }
         }
 
@@ -372,6 +367,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
         // query rocksdb
         final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(name,
db.newIterator(), serdes, from, to);
         openIterators.add(rocksDBRangeIterator);
+
         return rocksDBRangeIterator;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b113b4bd/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index af917e6..65a9dec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -162,6 +162,7 @@ public abstract class AbstractKeyValueStoreTest {
         // receive the restore entries ...
         store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
         context.restore(store.name(), driver.restoredEntries());
+
         // Verify that the store's contents were properly restored ...
         assertEquals(0, driver.checkForRestoredEntries(store));
 


Mime
View raw message