kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6138 Simplify StreamsBuilder#addGlobalStore (#4430)
Date Wed, 31 Jan 2018 21:53:18 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b822206  KAFKA-6138 Simplify StreamsBuilder#addGlobalStore (#4430)
b822206 is described below

commit b8222065e0cc09f455dc5b2aa43c1d68cf0d4a1f
Author: Panuwat Anawatmongkhon <benzbicykle@gmail.com>
AuthorDate: Thu Feb 1 04:53:15 2018 +0700

    KAFKA-6138 Simplify StreamsBuilder#addGlobalStore (#4430)
    
    - implements KIP-233
    
    Author: Panuwat Anawatmongkhon <panuwat.anawatmongkhon@gmail.com>
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian@confluent.io>,
Matthias J. Sax <matthias@confluent.io>
---
 .../org/apache/kafka/streams/StreamsBuilder.java   | 34 ++++++++++++++++------
 .../kstream/internals/InternalStreamsBuilder.java  | 16 ++++++++++
 2 files changed, 41 insertions(+), 9 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 27105c6..f08098e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -454,6 +454,28 @@ public class StreamsBuilder {
     }
 
     /**
+     * @deprecated use {@link #addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier)}
instead
+     */
+    @SuppressWarnings("unchecked")
+    @Deprecated
+    public synchronized StreamsBuilder addGlobalStore(final StoreBuilder storeBuilder,
+                                                      final String topic,
+                                                      final String sourceName,
+                                                      final Consumed consumed,
+                                                      final String processorName,
+                                                      final ProcessorSupplier stateUpdateSupplier)
{
+        Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
+        internalStreamsBuilder.addGlobalStore(storeBuilder,
+                                              sourceName,
+                                              topic,
+                                              new ConsumedInternal<>(consumed),
+                                              processorName,
+                                              stateUpdateSupplier);
+        return this;
+    }
+
+    /**
      * Adds a global {@link StateStore} to the topology.
      * The {@link StateStore} sources its data from all partitions of the provided input
topic.
      * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
@@ -467,10 +489,8 @@ public class StreamsBuilder {
      * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config}
is used.
      *
      * @param storeBuilder          user defined {@link StoreBuilder}; can't be {@code null}
-     * @param sourceName            name of the {@link SourceNode} that will be automatically
added
      * @param topic                 the topic to source the data from
      * @param consumed              the instance of {@link Consumed} used to define optional
parameters; can't be {@code null}
-     * @param processorName         the name of the {@link ProcessorSupplier}
      * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
      * @return itself
      * @throws TopologyException if the processor of state is already registered
@@ -478,18 +498,14 @@ public class StreamsBuilder {
     @SuppressWarnings("unchecked")
     public synchronized StreamsBuilder addGlobalStore(final StoreBuilder storeBuilder,
                                                       final String topic,
-                                                      final String sourceName,
                                                       final Consumed consumed,
-                                                      final String processorName,
                                                       final ProcessorSupplier stateUpdateSupplier)
{
         Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
         internalStreamsBuilder.addGlobalStore(storeBuilder,
-                                              sourceName,
-                                              topic,
-                                              new ConsumedInternal<>(consumed),
-                                              processorName,
-                                              stateUpdateSupplier);
+                topic,
+                new ConsumedInternal<>(consumed),
+                stateUpdateSupplier);
         return this;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 2a8a89e..0b028e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -194,4 +194,20 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                                                processorName,
                                                stateUpdateSupplier);
     }
+    
+    public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
+                                            final String topic,
+                                            final ConsumedInternal consumed,
+                                            final ProcessorSupplier stateUpdateSupplier)
{
+        // explicitly disable logging for global stores
+        storeBuilder.withLoggingDisabled();
+        final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME);
+        final String processorName = newProcessorName(KTableImpl.SOURCE_NAME);
+        addGlobalStore(storeBuilder,
+                       sourceName,
+                       topic,
+                       consumed,
+                       processorName,
+                       stateUpdateSupplier);
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message