kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Need to get a new transformer for each get() call. can't share'em (#4435)
Date Thu, 18 Jan 2018 16:45:28 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5701ba8  MINOR: Need to get a new transformer for each get() call. can't share'em
(#4435)
5701ba8 is described below

commit 5701ba88f895ac9211f25c15204d6281aed00ffc
Author: dan norwood <dan.norwood@gmail.com>
AuthorDate: Thu Jan 18 08:45:22 2018 -0800

    MINOR: Need to get a new transformer for each get() call. can't share'em (#4435)
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>
---
 .../streams/kstream/internals/AbstractStream.java  |  4 +--
 .../kstream/internals/AbstractStreamTest.java      | 33 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 51fe820..41cdec2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -131,10 +131,10 @@ public abstract class AbstractStream<K> {
 
     static <K, V, VR> InternalValueTransformerWithKeySupplier<K, V, VR> toInternalValueTransformerSupplier(final
ValueTransformerSupplier<V, VR> valueTransformerSupplier) {
         Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't
be null");
-        final ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
         return new InternalValueTransformerWithKeySupplier<K, V, VR>() {
             @Override
             public InternalValueTransformerWithKey<K, V, VR> get() {
+                final ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
                 return new InternalValueTransformerWithKey<K, V, VR>() {
                     @Override
                     public VR punctuate(final long timestamp) {
@@ -162,10 +162,10 @@ public abstract class AbstractStream<K> {
 
     static <K, V, VR> InternalValueTransformerWithKeySupplier<K, V, VR> toInternalValueTransformerSupplier(final
ValueTransformerWithKeySupplier<K, V, VR> valueTransformerWithKeySupplier) {
         Objects.requireNonNull(valueTransformerWithKeySupplier, "valueTransformerSupplier
can't be null");
-        final ValueTransformerWithKey<K, V, VR> valueTransformerWithKey = valueTransformerWithKeySupplier.get();
         return new InternalValueTransformerWithKeySupplier<K, V, VR>() {
             @Override
             public InternalValueTransformerWithKey<K, V, VR> get() {
+                final ValueTransformerWithKey<K, V, VR> valueTransformerWithKey = valueTransformerWithKeySupplier.get();
                 return new InternalValueTransformerWithKey<K, V, VR>() {
                     @Override
                     public VR punctuate(final long timestamp) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index e16d8e4..dcfb9ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -30,6 +32,10 @@ import org.junit.Test;
 
 import java.util.Random;
 
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertTrue;
 
 public class AbstractStreamTest {
@@ -39,6 +45,33 @@ public class AbstractStreamTest {
     public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Test
+    public void testToInternlValueTransformerSupplierSuppliesNewTransformers() {
+        final ValueTransformerSupplier vts = createMock(ValueTransformerSupplier.class);
+        expect(vts.get()).andReturn(null).times(3);
+        final InternalValueTransformerWithKeySupplier ivtwks =
+            AbstractStream.toInternalValueTransformerSupplier(vts);
+        replay(vts);
+        ivtwks.get();
+        ivtwks.get();
+        ivtwks.get();
+        verify(vts);
+    }
+
+    @Test
+    public void testToInternalValueTransformerSupplierSuppliesNewTransformers() {
+        final ValueTransformerWithKeySupplier vtwks =
+            createMock(ValueTransformerWithKeySupplier.class);
+        expect(vtwks.get()).andReturn(null).times(3);
+        final InternalValueTransformerWithKeySupplier ivtwks =
+            AbstractStream.toInternalValueTransformerSupplier(vtwks);
+        replay(vtwks);
+        ivtwks.get();
+        ivtwks.get();
+        ivtwks.get();
+        verify(vtwks);
+    }
+
+    @Test
     public void testShouldBeExtensible() {
         final StreamsBuilder builder = new StreamsBuilder();
         final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <commits@kafka.apache.org>'].

Mime
View raw message