kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [13/50] [abbrv] kafka git commit: KAKFA-3599: Move WindowStoreUtils to package "internals"
Date Fri, 29 Apr 2016 22:05:27 GMT
KAKFA-3599: Move WindowStoreUtils to package "internals"

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Ismael Juma, Michael G. Noll, Guozhang Wang

Closes #1266 from mjsax/kafka-3599-minorCodeCleanup


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9d37b9f4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9d37b9f4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9d37b9f4

Branch: refs/heads/0.10.0
Commit: 9d37b9f4b6ba228ff7f7b99c8a0921a971fc03a6
Parents: 088ab3e
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Tue Apr 26 10:53:49 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Apr 26 10:53:49 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/state/WindowStoreUtils.java   | 63 -------------------
 .../streams/state/internals/RocksDBStore.java   |  1 -
 .../state/internals/RocksDBWindowStore.java     |  1 -
 .../state/internals/WindowStoreUtils.java       | 65 ++++++++++++++++++++
 .../state/internals/RocksDBWindowStoreTest.java |  1 -
 5 files changed, 65 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9d37b9f4/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
deleted file mode 100644
index 2f99ad6..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
-
-import java.nio.ByteBuffer;
-
-public class WindowStoreUtils {
-
-    private static final int SEQNUM_SIZE = 4;
-    private static final int TIMESTAMP_SIZE = 8;
-
-    /** Inner byte array serde used for segments */
-    public static final Serde<Bytes> INNER_KEY_SERDE = Serdes.Bytes();
-    public static final Serde<byte[]> INNER_VALUE_SERDE = Serdes.ByteArray();
-    public static final StateSerdes<Bytes, byte[]> INNER_SERDES = new StateSerdes<>("rocksDB-inner",
INNER_KEY_SERDE, INNER_VALUE_SERDE);
-
-    @SuppressWarnings("unchecked")
-    public static final KeyValueIterator<Bytes, byte[]>[] NO_ITERATORS = (KeyValueIterator<Bytes,
byte[]>[]) new KeyValueIterator[0];
-
-    public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum,
StateSerdes<K, ?> serdes) {
-        byte[] serializedKey = serdes.rawKey(key);
-
-        ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + SEQNUM_SIZE);
-        buf.put(serializedKey);
-        buf.putLong(timestamp);
-        buf.putInt(seqnum);
-
-        return buf.array();
-    }
-
-    public static <K> K keyFromBinaryKey(byte[] binaryKey, StateSerdes<K, ?>
serdes) {
-        byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
-
-        System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
-
-        return serdes.keyFrom(bytes);
-    }
-
-    public static long timestampFromBinaryKey(byte[] binaryKey) {
-        return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d37b9f4/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 3fef0ef..37609a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
 
-import org.apache.kafka.streams.state.WindowStoreUtils;
 import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.CompactionStyle;
 import org.rocksdb.CompressionType;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d37b9f4/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 4c964c6..803a089 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -30,7 +30,6 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.streams.state.WindowStoreUtils;
 
 import java.io.File;
 import java.text.SimpleDateFormat;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d37b9f4/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
new file mode 100644
index 0000000..30693e7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import java.nio.ByteBuffer;
+
+public class WindowStoreUtils {
+
+    private static final int SEQNUM_SIZE = 4;
+    private static final int TIMESTAMP_SIZE = 8;
+
+    /** Inner byte array serde used for segments */
+    public static final Serde<Bytes> INNER_KEY_SERDE = Serdes.Bytes();
+    public static final Serde<byte[]> INNER_VALUE_SERDE = Serdes.ByteArray();
+    public static final StateSerdes<Bytes, byte[]> INNER_SERDES = new StateSerdes<>("rocksDB-inner",
INNER_KEY_SERDE, INNER_VALUE_SERDE);
+
+    @SuppressWarnings("unchecked")
+    public static final KeyValueIterator<Bytes, byte[]>[] NO_ITERATORS = (KeyValueIterator<Bytes,
byte[]>[]) new KeyValueIterator[0];
+
+    public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum,
StateSerdes<K, ?> serdes) {
+        byte[] serializedKey = serdes.rawKey(key);
+
+        ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + SEQNUM_SIZE);
+        buf.put(serializedKey);
+        buf.putLong(timestamp);
+        buf.putInt(seqnum);
+
+        return buf.array();
+    }
+
+    public static <K> K keyFromBinaryKey(byte[] binaryKey, StateSerdes<K, ?>
serdes) {
+        byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
+
+        System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
+
+        return serdes.keyFrom(bytes);
+    }
+
+    public static long timestampFromBinaryKey(byte[] binaryKey) {
+        return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d37b9f4/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 502870b..e9888ad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -31,7 +31,6 @@ import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.streams.state.WindowStoreUtils;
 import org.apache.kafka.test.MockProcessorContext;
 import org.junit.Test;
 


Mime
View raw message