beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [1/2] beam git commit: [BEAM-351] Add DisplayData to KafkaIO
Date Wed, 01 Mar 2017 17:44:34 GMT
Repository: beam
Updated Branches:
  refs/heads/master d84b06791 -> 3b3d6b81a


[BEAM-351] Add DisplayData to KafkaIO

Changes after review.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52e2d3a7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52e2d3a7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52e2d3a7

Branch: refs/heads/master
Commit: 52e2d3a77096460ae4a10ac977b4897a1eecf3a1
Parents: d84b067
Author: Aviem Zur <aviemzur@gmail.com>
Authored: Sun Feb 26 22:39:28 2017 +0200
Committer: Sela <ansela@paypal.com>
Committed: Wed Mar 1 19:28:26 2017 +0200

----------------------------------------------------------------------
 sdks/java/io/kafka/pom.xml                      |  7 +++
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 49 ++++++++++++++++++
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 53 +++++++++++++++++++-
 3 files changed, 108 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/52e2d3a7/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index 6935f1e..d5ffe63 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -149,6 +149,13 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/beam/blob/52e2d3a7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 5fd34b9..890fb2b 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -44,6 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -63,12 +64,14 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.ExposedByteArrayInputStream;
 import org.apache.beam.sdk.values.KV;
@@ -500,6 +503,27 @@ public class KafkaIO {
             return new KafkaConsumer<>(config);
           }
         };
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      List<String> topics = getTopics();
+      List<TopicPartition> topicPartitions = getTopicPartitions();
+      if (topics.size() > 0) {
+        builder.add(DisplayData.item("topics", Joiner.on(",").join(topics)).withLabel("Topic/s"));
+      } else if (topicPartitions.size() > 0) {
+        builder.add(DisplayData.item("topicPartitions", Joiner.on(",").join(topicPartitions))
+            .withLabel("Topic Partition/s"));
+      }
+      Set<String> ignoredConsumerPropertiesKeys = IGNORED_CONSUMER_PROPERTIES.keySet();
+      for (Map.Entry<String, Object> conf : getConsumerConfig().entrySet()) {
+        String key = conf.getKey();
+        if (!ignoredConsumerPropertiesKeys.contains(key)) {
+          builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(conf.getValue())));
+        }
+      }
+    }
   }
 
   /**
@@ -527,6 +551,12 @@ public class KafkaIO {
                 }
               }));
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      read.populateDisplayData(builder);
+    }
   }
 
   ////////////////////////////////////////////////////////////////////////////////////////////////
@@ -1222,6 +1252,19 @@ public class KafkaIO {
         configForKeySerializer(), "Reserved for internal serializer",
         configForValueSerializer(), "Reserved for internal serializer"
      );
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.addIfNotNull(DisplayData.item("topic", getTopic()).withLabel("Topic"));
+      Set<String> ignoredProducerPropertiesKeys = IGNORED_PRODUCER_PROPERTIES.keySet();
+      for (Map.Entry<String, Object> conf : getProducerConfig().entrySet()) {
+        String key = conf.getKey();
+        if (!ignoredProducerPropertiesKeys.contains(key)) {
+          builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(conf.getValue())));
+        }
+      }
+    }
   }
 
   /**
@@ -1248,6 +1291,12 @@ public class KafkaIO {
         .setCoder(KvCoder.of(new NullOnlyCoder<K>(), kvWriteTransform.getValueCoder()))
         .apply(kvWriteTransform);
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      kvWriteTransform.populateDisplayData(builder);
+    }
   }
 
   private static class NullOnlyCoder<T> extends AtomicCoder<T> {

http://git-wip-us.apache.org/repos/asf/beam/blob/52e2d3a7/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 9d7c27b..1897127 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
@@ -24,6 +25,7 @@ import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -58,6 +60,7 @@ import org.apache.beam.sdk.transforms.Min;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -223,7 +226,7 @@ public class KafkaIOTest {
     List<String> topics = ImmutableList.of("topic_a", "topic_b");
 
     KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read()
-        .withBootstrapServers("none")
+        .withBootstrapServers("myServer1:9092,myServer2:9092")
         .withTopics(topics)
         .withConsumerFactoryFn(new ConsumerFactoryFn(
             topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions
@@ -619,6 +622,54 @@ public class KafkaIOTest {
     }
   }
 
+  @Test
+  public void testSourceDisplayData() {
+    KafkaIO.Read<Integer, Long> read = mkKafkaReadTransform(10, null);
+
+    DisplayData displayData = DisplayData.from(read);
+
+    assertThat(displayData, hasDisplayItem("topics", "topic_a,topic_b"));
+    assertThat(displayData, hasDisplayItem("enable.auto.commit", false));
+    assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092"));
+    assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest"));
+    assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288));
+  }
+
+  @Test
+  public void testSourceWithExplicitPartitionsDisplayData() {
+    KafkaIO.Read<byte[], Long> read = KafkaIO.<byte[], Long>read()
+        .withBootstrapServers("myServer1:9092,myServer2:9092")
+        .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5),
+            new TopicPartition("test", 6)))
+        .withConsumerFactoryFn(new ConsumerFactoryFn(
+            Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST)) // 10 partitions
+        .withKeyCoder(ByteArrayCoder.of())
+        .withValueCoder(BigEndianLongCoder.of());
+
+    DisplayData displayData = DisplayData.from(read);
+
+    assertThat(displayData, hasDisplayItem("topicPartitions", "test-5,test-6"));
+    assertThat(displayData, hasDisplayItem("enable.auto.commit", false));
+    assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092"));
+    assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest"));
+    assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288));
+  }
+
+  @Test
+  public void testSinkDisplayData() {
+    KafkaIO.Write<Integer, Long> write = KafkaIO.<Integer, Long>write()
+        .withBootstrapServers("myServerA:9092,myServerB:9092")
+        .withTopic("myTopic")
+        .withValueCoder(BigEndianLongCoder.of())
+        .withProducerFactoryFn(new ProducerFactoryFn());
+
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(displayData, hasDisplayItem("topic", "myTopic"));
+    assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092"));
+    assertThat(displayData, hasDisplayItem("retries", 3));
+  }
+
   private static void verifyProducerRecords(String topic, int numElements, boolean keyIsAbsent)
{
 
     // verify that appropriate messages are written to kafka


Mime
View raw message