kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6896: Add producer metrics exporting in KafkaStreams (#4998)
Date Tue, 15 May 2018 21:29:17 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1e207b2  KAFKA-6896: Add producer metrics exporting in KafkaStreams (#4998)
1e207b2 is described below

commit 1e207b2ef83db4296fb944ade4c301794f9e8460
Author: Boyang Chen <bchen11@outlook.com>
AuthorDate: Tue May 15 14:29:07 2018 -0700

    KAFKA-6896: Add producer metrics exporting in KafkaStreams (#4998)
    
    We would like to also export the producer metrics from StreamThread just like consumer
metrics, so that we could gain more visibility of stream application. The approach is to pass
in the threadProducer into the StreamThread so that we could export its metrics in dynamic.
    
    Note that this is a pure internal change that doesn't require a KIP, and in the future
we also want to export admin client metrics. A followup KIP for admin client will be created
once this is merged.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/clients/producer/MockProducer.java       | 12 +++++-
 .../org/apache/kafka/streams/KafkaStreams.java     |  3 +-
 .../streams/processor/internals/StreamTask.java    |  4 ++
 .../streams/processor/internals/StreamThread.java  | 23 +++++++++++
 .../processor/internals/StreamThreadTest.java      | 46 +++++++++++++++++++++-
 5 files changed, 84 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index ddf5b88..9e9869a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -32,7 +32,6 @@ import org.apache.kafka.common.serialization.Serializer;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.List;
@@ -70,6 +69,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
     private boolean producerFenced;
     private boolean sentOffsets;
     private long commitCount = 0L;
+    private Map<MetricName, Metric> mockMetrics;
 
     /**
      * Create a mock producer
@@ -99,6 +99,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
         this.consumerGroupOffsets = new ArrayList<>();
         this.uncommittedConsumerGroupOffsets = new HashMap<>();
         this.completions = new ArrayDeque<>();
+        this.mockMetrics = new HashMap<>();
     }
 
     /**
@@ -293,7 +294,14 @@ public class MockProducer<K, V> implements Producer<K, V>
{
     }
 
     public Map<MetricName, Metric> metrics() {
-        return Collections.emptyMap();
+        return mockMetrics;
+    }
+
+    /**
+     * Set a mock metric for testing purpose
+     */
+    public void setMockMetrics(MetricName name, Metric metric) {
+        mockMetrics.put(name, metric);
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index f36bbac..612a453 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -383,10 +383,11 @@ public class KafkaStreams {
      *
      * @return Map of all metrics.
      */
-    // TODO: we can add metrics for producer and admin client as well
+    // TODO: we can add metrics for admin client as well
     public Map<MetricName, ? extends Metric> metrics() {
         final Map<MetricName, Metric> result = new LinkedHashMap<>();
         for (final StreamThread thread : threads) {
+            result.putAll(thread.producerMetrics());
             result.putAll(thread.consumerMetrics());
         }
         if (globalStreamThread != null) result.putAll(globalStreamThread.consumerMetrics());
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index b975324..633e7ad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -742,4 +742,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     RecordCollector recordCollector() {
         return recordCollector;
     }
+
+    Producer<byte[], byte[]> getProducer() {
+        return producer;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 32993f6..ddefd9c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -569,6 +569,7 @@ public class StreamThread extends Thread {
 
     // package-private for testing
     final ConsumerRebalanceListener rebalanceListener;
+    final Producer<byte[], byte[]> producer;
     final Consumer<byte[], byte[]> restoreConsumer;
     final Consumer<byte[], byte[]> consumer;
     final InternalTopologyBuilder builder;
@@ -658,6 +659,7 @@ public class StreamThread extends Thread {
         return new StreamThread(
             time,
             config,
+            threadProducer,
             restoreConsumer,
             consumer,
             originalReset,
@@ -670,6 +672,7 @@ public class StreamThread extends Thread {
 
     public StreamThread(final Time time,
                         final StreamsConfig config,
+                        final Producer<byte[], byte[]> producer,
                         final Consumer<byte[], byte[]> restoreConsumer,
                         final Consumer<byte[], byte[]> consumer,
                         final String originalReset,
@@ -690,6 +693,7 @@ public class StreamThread extends Thread {
         this.log = logContext.logger(StreamThread.class);
         this.rebalanceListener = new RebalanceListener(time, taskManager, this, this.log);
         this.taskManager = taskManager;
+        this.producer = producer;
         this.restoreConsumer = restoreConsumer;
         this.consumer = consumer;
         this.originalReset = originalReset;
@@ -1217,6 +1221,25 @@ public class StreamThread extends Thread {
         return standbyRecords;
     }
 
+    public Map<MetricName, Metric> producerMetrics() {
+        final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>();
+        if (producer != null) {
+            final Map<MetricName, ? extends Metric> producerMetrics = producer.metrics();
+            if (producerMetrics != null) {
+                result.putAll(producerMetrics);
+            }
+        } else {
+            // When EOS is turned on, each task will has its own producer client
+            // and the producer object passed in here will be null. We would then iterate
through
+            // all the active tasks and add their metrics to the output metrics map.
+            for (StreamTask task: taskManager.activeTasks().values()) {
+                final Map<MetricName, ? extends Metric> taskProducerMetrics = task.getProducer().metrics();
+                result.putAll(taskProducerMetrics);
+            }
+        }
+        return result;
+    }
+
     public Map<MetricName, Metric> consumerMetrics() {
         final Map<MetricName, ? extends Metric> consumerMetrics = consumer.metrics();
         final Map<MetricName, ? extends Metric> restoreConsumerMetrics = restoreConsumer.metrics();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5bc1934..749d618 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -29,7 +29,11 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
@@ -286,6 +290,7 @@ public class StreamThreadTest {
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
+            null,
             consumer,
             consumer,
             null,
@@ -318,6 +323,7 @@ public class StreamThreadTest {
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
+            null,
             consumer,
             consumer,
             null,
@@ -350,6 +356,7 @@ public class StreamThreadTest {
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
+            null,
             consumer,
             consumer,
             null,
@@ -496,6 +503,7 @@ public class StreamThreadTest {
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
+            null,
             consumer,
             consumer,
             null,
@@ -531,6 +539,7 @@ public class StreamThreadTest {
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
+            null,
             consumer,
             consumer,
             null,
@@ -557,6 +566,7 @@ public class StreamThreadTest {
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
+            null,
             consumer,
             consumer,
             null,
@@ -1242,6 +1252,40 @@ public class StreamThreadTest {
         assertTrue(metadata.standbyTasks().isEmpty());
     }
 
+    @Test
+    // TODO: Need to add a test case covering EOS when we create a mock taskManager class
+    public void producerMetricsVerificationWithoutEOS() {
+        final MockProducer<byte[], byte[]> producer = new MockProducer();
+        final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
+        final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
 
-
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics,
"");
+        final StreamThread thread = new StreamThread(
+                mockTime,
+                config,
+                producer,
+                consumer,
+                consumer,
+                null,
+                taskManager,
+                streamsMetrics,
+                internalTopologyBuilder,
+                clientId,
+                new LogContext(""));
+        final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<String,
String>());
+        final Metric testMetric = new KafkaMetric(
+                new Object(),
+                testMetricName,
+                new Measurable() {
+                    @Override
+                    public double measure(MetricConfig config, long now) {
+                        return 0;
+                    }
+                },
+                null,
+                new MockTime());
+        producer.setMockMetrics(testMetricName, testMetric);
+        Map<MetricName, Metric> producerMetrics = thread.producerMetrics();
+        assertEquals(testMetricName, producerMetrics.get(testMetricName).metricName());
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message