kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-6538: Changes to enhance ByteStore exceptions thrown from RocksDBStore with more human readable info (#5103)
Date Thu, 07 Jun 2018 03:44:22 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 79c57f2  KAFKA-6538: Changes to enhance  ByteStore exceptions thrown from RocksDBStore
with more human readable info (#5103)
79c57f2 is described below

commit 79c57f23c0df60f0e4e29b29258baf87a8171c83
Author: Jagadesh Adireddi <adireddijagadesh@gmail.com>
AuthorDate: Thu Jun 7 09:12:41 2018 +0530

    KAFKA-6538: Changes to enhance  ByteStore exceptions thrown from RocksDBStore with more
human readable info (#5103)
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 .../state/internals/InnerMeteredKeyValueStore.java | 72 +++++++++++++---------
 .../state/internals/MeteredSessionStore.java       |  7 +++
 .../state/internals/MeteredWindowStore.java        |  4 ++
 .../streams/state/internals/RocksDBStore.java      |  9 ++-
 4 files changed, 61 insertions(+), 31 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
index 14464e0..200b2d7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -173,30 +174,40 @@ class InnerMeteredKeyValueStore<K, IK, V, IV> extends WrappedStateStore.Abstract
 
     @Override
     public V get(final K key) {
-        if (getTime.shouldRecord()) {
-            return measureLatency(new Action<V>() {
-                @Override
-                public V execute() {
-                    return typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
-                }
-            }, getTime);
-        } else {
-            return typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
+        try {
+            if (getTime.shouldRecord()) {
+                return measureLatency(new Action<V>() {
+                    @Override
+                    public V execute() {
+                        return typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
+                    }
+                }, getTime);
+            } else {
+                return typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
+            }
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key);
+            throw new ProcessorStateException(message, e);
         }
     }
 
     @Override
     public void put(final K key, final V value) {
-        if (putTime.shouldRecord()) {
-            measureLatency(new Action<V>() {
-                @Override
-                public V execute() {
-                    inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value));
-                    return null;
-                }
-            }, putTime);
-        } else {
-            inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value));
+        try {
+            if (putTime.shouldRecord()) {
+                measureLatency(new Action<V>() {
+                    @Override
+                    public V execute() {
+                        inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value));
+                        return null;
+                    }
+                }, putTime);
+            } else {
+                inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value));
+            }
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key, value);
+            throw new ProcessorStateException(message, e);
         }
     }
 
@@ -232,15 +243,20 @@ class InnerMeteredKeyValueStore<K, IK, V, IV> extends WrappedStateStore.Abstract
 
     @Override
     public V delete(final K key) {
-        if (deleteTime.shouldRecord()) {
-            return measureLatency(new Action<V>() {
-                @Override
-                public V execute() {
-                    return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
-                }
-            }, deleteTime);
-        } else {
-            return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
+        try {
+            if (deleteTime.shouldRecord()) {
+                return measureLatency(new Action<V>() {
+                    @Override
+                    public V execute() {
+                        return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
+                    }
+                }, deleteTime);
+            } else {
+                return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
+            }
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key);
+            throw new ProcessorStateException(message, e);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 5636219..3e881ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -128,6 +129,9 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
         try {
             final Bytes key = keyBytes(sessionKey.key());
             inner.remove(new Windowed<>(key, sessionKey.window()));
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), sessionKey.key());
+            throw new ProcessorStateException(message, e);
         } finally {
             this.metrics.recordLatency(removeTime, startNs, time.nanoseconds());
         }
@@ -140,6 +144,9 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
         try {
             final Bytes key = keyBytes(sessionKey.key());
             this.inner.put(new Windowed<>(key, sessionKey.window()), serdes.rawValue(aggregate));
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), sessionKey.key(), aggregate);
+            throw new ProcessorStateException(message, e);
         } finally {
             this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 2487854..62ed6c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -94,6 +95,9 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
         final long startNs = time.nanoseconds();
         try {
             inner.put(keyBytes(key), serdes.rawValue(value), timestamp);
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key, value);
+            throw new ProcessorStateException(message, e);
         } finally {
             metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index ff6c56a..cfef035 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -224,7 +224,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>
{
         try {
             return this.db.get(rawKey);
         } catch (final RocksDBException e) {
-            throw new ProcessorStateException("Error while getting value for key from store
" + this.name, e);
+            // String format is happening in wrapping stores. So formatted message is thrown
from wrapping stores.
+            throw new ProcessorStateException("Error while getting value for key %s from
store " + this.name, e);
         }
     }
 
@@ -300,13 +301,15 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>
{
             try {
                 db.delete(wOptions, rawKey);
             } catch (final RocksDBException e) {
-                throw new ProcessorStateException("Error while removing key from store "
+ this.name, e);
+                // String format is happening in wrapping stores. So formatted message is
thrown from wrapping stores.
+                throw new ProcessorStateException("Error while removing key %s from store
" + this.name, e);
             }
         } else {
             try {
                 db.put(wOptions, rawKey, rawValue);
             } catch (final RocksDBException e) {
-                throw new ProcessorStateException("Error while executing putting key/value
into store " + this.name, e);
+                // String format is happening in wrapping stores. So formatted message is
thrown from wrapping stores.
+                throw new ProcessorStateException("Error while putting key %s value %s into
store " + this.name, e);
             }
         }
     }

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message