crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [1/2] crunch git commit: CRUNCH-653: Created KafkaSource that provides ConsumerRecord messages
Date Fri, 27 Oct 2017 04:06:42 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 28ab19975 -> f47347814


http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaRecordsIterableIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaRecordsIterableIT.java
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaRecordsIterableIT.java
new file mode 100644
index 0000000..9939d64
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaRecordsIterableIT.java
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.record;
+
+import org.apache.crunch.kafka.*;
+import org.junit.Test;
+
+import kafka.api.OffsetRequest;
+import org.apache.crunch.Pair;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.crunch.kafka.ClusterTest.writeData;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNot.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaRecordsIterableIT {
+
+  @Mock
+  private Consumer<String, String> mockedConsumer;
+
+  @Mock
+  private ConsumerRecords<String, String> records;
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private String topic;
+  private Map<TopicPartition, Long> startOffsets;
+  private Map<TopicPartition, Long> stopOffsets;
+  private Map<TopicPartition, Pair<Long, Long>> offsets;
+  private Consumer<String, String> consumer;
+  private Properties props;
+  private Properties consumerProps;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    ClusterTest.startTest();
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    ClusterTest.endTest();
+  }
+
+  @Before
+  public void setup() {
+    topic = UUID.randomUUID().toString();
+
+    props = ClusterTest.getConsumerProperties();
+
+    startOffsets = new HashMap<>();
+    stopOffsets = new HashMap<>();
+    offsets = new HashMap<>();
+
+    for (int i = 0; i < 4; i++) {
+      TopicPartition tp = new TopicPartition(topic, i);
+      startOffsets.put(tp, 0L);
+      stopOffsets.put(tp, 100L);
+
+      offsets.put(tp, Pair.of(0L, 100L));
+    }
+
+    consumerProps = new Properties();
+    consumerProps.putAll(props);
+  }
+
+  @After
+  public void shutdown() {
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void nullConsumer() {
+    new KafkaRecordsIterable(null, offsets, new Properties());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void nullOffsets() {
+    new KafkaRecordsIterable<>(consumer, null, new Properties());
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void emptyOffsets() {
+    consumer = new KafkaConsumer<>(consumerProps, new ClusterTest.StringSerDe(), new
ClusterTest.StringSerDe());
+    Iterable<ConsumerRecord<String, String>> data = new KafkaRecordsIterable<>(consumer,
+        Collections.<TopicPartition, Pair<Long, Long>>emptyMap(), new Properties());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void nullProperties() {
+    new KafkaRecordsIterable(consumer, offsets, null);
+  }
+
+  @Test
+  public void iterateOverValues() {
+    consumer = new KafkaConsumer<>(consumerProps, new ClusterTest.StringSerDe(), new
ClusterTest.StringSerDe());
+    int loops = 10;
+    int numPerLoop = 100;
+    int total = loops * numPerLoop;
+    List<String> keys = writeData(props, topic, "batch", loops, numPerLoop);
+
+    startOffsets = getStartOffsets(props, topic);
+    stopOffsets = getStopOffsets(props, topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+    }
+
+
+    Iterable<ConsumerRecord<String, String>> data = new KafkaRecordsIterable<String,
String>(consumer, offsets, new Properties());
+
+    int count = 0;
+    for (ConsumerRecord<String, String> record : data) {
+      assertThat(keys, hasItem(record.key()));
+      assertTrue(keys.remove(record.key()));
+      count++;
+    }
+
+    assertThat(count, is(total));
+    assertThat(keys.size(), is(0));
+  }
+
+  @Test
+  public void iterateOverOneValue() {
+    consumer = new KafkaConsumer<>(consumerProps, new ClusterTest.StringSerDe(), new
ClusterTest.StringSerDe());
+    int loops = 1;
+    int numPerLoop = 1;
+    int total = loops * numPerLoop;
+    List<String> keys = writeData(props, topic, "batch", loops, numPerLoop);
+
+    startOffsets = getStartOffsets(props, topic);
+    stopOffsets = getStopOffsets(props, topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+    }
+
+    Iterable<ConsumerRecord<String, String>> data = new KafkaRecordsIterable<String,
String>(consumer, offsets, new Properties());
+
+    int count = 0;
+    for (ConsumerRecord<String, String> record : data) {
+      assertThat(keys, hasItem(record.key()));
+      assertTrue(keys.remove(record.key()));
+      count++;
+    }
+
+    assertThat(count, is(total));
+    assertThat(keys.size(), is(0));
+  }
+
+  @Test
+  public void iterateOverNothing() {
+    consumer = new KafkaConsumer<>(consumerProps, new ClusterTest.StringSerDe(), new
ClusterTest.StringSerDe());
+    int loops = 10;
+    int numPerLoop = 100;
+    writeData(props, topic, "batch", loops, numPerLoop);
+
+    //set the start offsets equal to the stop so won't iterate over anything
+    startOffsets = getStartOffsets(props, topic);
+    stopOffsets = getStartOffsets(props, topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+    }
+
+    Iterable<ConsumerRecord<String, String>> data = new KafkaRecordsIterable<>(consumer,
offsets, new Properties());
+
+    int count = 0;
+    for (ConsumerRecord<String, String> record : data) {
+      count++;
+    }
+
+    assertThat(count, is(0));
+  }
+
+  @Test
+  public void iterateOverPartial() {
+    consumer = new KafkaConsumer<>(consumerProps, new ClusterTest.StringSerDe(), new
ClusterTest.StringSerDe());
+    int loops = 10;
+    int numPerLoop = 100;
+    int numPerPartition = 50;
+
+    writeData(props, topic, "batch", loops, numPerLoop);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    //set the start offsets equal to the stop so won't iterate over anything
+    startOffsets = getStartOffsets(props, topic);
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), entry.getValue() + numPerPartition));
+    }
+
+    Iterable<ConsumerRecord<String, String>> data = new KafkaRecordsIterable<>(consumer,
offsets, new Properties());
+
+    int count = 0;
+    for (ConsumerRecord<String, String> record : data) {
+      count++;
+    }
+
+    assertThat(count, is(startOffsets.size() * numPerPartition));
+  }
+
+  @Test
+  public void dontIteratePastStop() {
+    consumer = new KafkaConsumer<>(consumerProps, new ClusterTest.StringSerDe(), new
ClusterTest.StringSerDe());
+    int loops = 10;
+    int numPerLoop = 100;
+
+    List<String> keys = writeData(props, topic, "batch1", loops, numPerLoop);
+
+    //set the start offsets equal to the stop so won't iterate over anything
+    startOffsets = getStartOffsets(props, topic);
+    stopOffsets = getStopOffsets(props, topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+    }
+
+    List<String> secondKeys = writeData(props, topic, "batch2", loops, numPerLoop);
+
+    Iterable<ConsumerRecord<String, String>> data = new KafkaRecordsIterable<>(consumer,
offsets, new Properties());
+
+    int count = 0;
+    for (ConsumerRecord<String, String> record : data) {
+      assertThat(keys, hasItem(record.key()));
+      assertTrue(keys.remove(record.key()));
+      assertThat(secondKeys, not(hasItem(record.key())));
+      count++;
+    }
+
+    assertThat(count, is(loops * numPerLoop));
+    assertThat(keys.size(), is(0));
+  }
+
+  @Test
+  public void iterateSkipInitialValues() {
+    consumer = new KafkaConsumer<>(consumerProps, new ClusterTest.StringSerDe(), new
ClusterTest.StringSerDe());
+    int loops = 10;
+    int numPerLoop = 100;
+
+    List<String> keys = writeData(props, topic, "batch1", loops, numPerLoop);
+
+    //set the start offsets equal to the stop so won't iterate over anything
+    startOffsets = getStopOffsets(props, topic);
+
+    List<String> secondKeys = writeData(props, topic, "batch2", loops, numPerLoop);
+
+    stopOffsets = getStopOffsets(props, topic);
+
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+    }
+
+
+    Iterable<ConsumerRecord<String, String>> data = new KafkaRecordsIterable<String,
String>(consumer, offsets,
+        new Properties());
+
+    int count = 0;
+    for (ConsumerRecord<String, String> record : data) {
+      assertThat(secondKeys, hasItem(record.key()));
+      assertTrue(secondKeys.remove(record.key()));
+      assertThat(keys, not(hasItem(record.key())));
+      count++;
+    }
+
+    assertThat(count, is(loops * numPerLoop));
+    assertThat(secondKeys.size(), is(0));
+  }
+
+  @Test
+  public void iterateValuesWithExceptions() {
+    List<ConsumerRecord<String, String>> returnedRecords = new LinkedList<>();
+
+    for(int i = 0; i < 25; i++){
+      returnedRecords.add(new ConsumerRecord<String, String>(topic, 0, i, "key", null));
+      returnedRecords.add(new ConsumerRecord<String, String>(topic, 1, i, "key", null));
+      returnedRecords.add(new ConsumerRecord<String, String>(topic, 2, i, "key", null));
+      returnedRecords.add(new ConsumerRecord<String, String>(topic, 3, i, "key", null));
+    }
+
+    offsets = new HashMap<>();
+    offsets.put(new TopicPartition(topic, 0), Pair.of(0L, 25L));
+    offsets.put(new TopicPartition(topic, 1), Pair.of(0L, 25L));
+    offsets.put(new TopicPartition(topic, 2), Pair.of(0L, 25L));
+    offsets.put(new TopicPartition(topic, 3), Pair.of(0L, 25L));
+
+    when(records.isEmpty()).thenReturn(false);
+    when(records.iterator()).thenReturn(returnedRecords.iterator());
+    when(mockedConsumer.poll(Matchers.anyLong()))
+        //request for the first poll
+        .thenReturn(null)
+        //fail twice
+        .thenThrow(new TimeoutException("fail1"))
+        .thenThrow(new TimeoutException("fail2"))
+        //request that will give data
+        .thenReturn(records)
+        // shows to stop retrieving data
+        .thenReturn(null);
+
+    Iterable<ConsumerRecord<String, String>> data = new KafkaRecordsIterable<>(mockedConsumer,
offsets, new Properties());
+
+    int count = 0;
+    for (ConsumerRecord<String, String> record : data) {
+      count++;
+    }
+
+    //should have gotten one value per topicpartition
+    assertThat(count, is(returnedRecords.size()));
+  }
+
+  @Test
+  public void iterateValuesAfterStopOffsets() {
+    List<ConsumerRecord<String, String>> returnedRecords = new LinkedList<>();
+    for (Map.Entry<TopicPartition, Long> entry : stopOffsets.entrySet()) {
+      returnedRecords.add(new ConsumerRecord<String, String>(entry.getKey().topic(),
+          entry.getKey().partition(), entry.getValue() + 1, "key", null));
+    }
+
+    when(records.isEmpty()).thenReturn(false);
+    when(records.iterator()).thenReturn(returnedRecords.iterator());
+    when(mockedConsumer.poll(Matchers.anyLong())).thenReturn(records).thenReturn(records).thenReturn(null);
+
+    Iterable<ConsumerRecord<String, String>> data = new KafkaRecordsIterable<>(mockedConsumer,
offsets, new Properties());
+
+    int count = 0;
+    for (ConsumerRecord<String, String> record : data) {
+      count++;
+    }
+
+    assertThat(count, is(0));
+
+  }
+
+  @Test(expected = RetriableException.class)
+  public void iterateRetriableExceptionMaxExceeded() {
+    List<ConsumerRecord<String, String>> returnedRecords = new LinkedList<>();
+    for (Map.Entry<TopicPartition, Long> entry : stopOffsets.entrySet()) {
+      returnedRecords.add(new ConsumerRecord<String, String>(entry.getKey().topic(),
+          entry.getKey().partition(), entry.getValue() + 1, "key", null));
+    }
+
+    when(records.isEmpty()).thenReturn(false);
+    when(records.iterator()).thenReturn(returnedRecords.iterator());
+    when(mockedConsumer.poll(Matchers.anyLong()))
+        //for the fill poll call
+        .thenReturn(null)
+        //retry 5 times then fail
+        .thenThrow(new TimeoutException("fail1"))
+        .thenThrow(new TimeoutException("fail2"))
+        .thenThrow(new TimeoutException("fail3"))
+        .thenThrow(new TimeoutException("fail4"))
+        .thenThrow(new TimeoutException("fail5"))
+        .thenThrow(new TimeoutException("fail6"));
+
+    Iterable<ConsumerRecord<String, String>> data = new KafkaRecordsIterable<>(mockedConsumer,
offsets, new Properties());
+
+    data.iterator().next();
+  }
+
+  private static Map<TopicPartition, Long> getStopOffsets(Properties props, String
topic) {
+    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.LatestTime(), topic);
+  }
+
+  private static Map<TopicPartition, Long> getStartOffsets(Properties props, String
topic) {
+    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.EarliestTime(), topic);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java
new file mode 100644
index 0000000..56241ae
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.record;
+
+import kafka.api.OffsetRequest;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.To;
+import org.apache.crunch.kafka.*;
+import org.apache.crunch.kafka.inputformat.KafkaInputFormat;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.matchers.JUnitMatchers.hasItem;
+
+public class KafkaSourceIT {
+
+  @Rule
+  public TemporaryPath path = new TemporaryPath();
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private Properties consumerProps;
+  private String topic;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startTest();
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    ClusterTest.endTest();
+  }
+
+  @Before
+  public void setupTest() {
+    topic = UUID.randomUUID().toString();
+    consumerProps = ClusterTest.getConsumerProperties();
+  }
+
+  @Test
+  public void defaultEarliestOffsetReset() {
+    Map<TopicPartition, Pair<Long, Long>> offsets = Collections.emptyMap();
+
+    //Remove this so should revert to default.
+    consumerProps.remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+
+    KafkaSource kafkaSource = new KafkaSource(consumerProps, offsets);
+
+    FormatBundle inputBundle = kafkaSource.getInputBundle();
+    Configuration cfg = new Configuration(false);
+    inputBundle.configure(cfg);
+    Properties kafkaConnectionProperties = KafkaUtils.getKafkaConnectionProperties(cfg);
+    kafkaConnectionProperties = KafkaInputFormat.filterConnectionProperties(kafkaConnectionProperties);
+    assertThat(kafkaConnectionProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
is("earliest"));
+  }
+
+  @Test
+  public void offsetResetOverridable() {
+    Map<TopicPartition, Pair<Long, Long>> offsets = Collections.emptyMap();
+
+    consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+
+    KafkaSource kafkaSource = new KafkaSource(consumerProps, offsets);
+
+    FormatBundle inputBundle = kafkaSource.getInputBundle();
+    Configuration cfg = new Configuration(false);
+    inputBundle.configure(cfg);
+    Properties kafkaConnectionProperties = KafkaUtils.getKafkaConnectionProperties(cfg);
+    kafkaConnectionProperties = KafkaInputFormat.filterConnectionProperties(kafkaConnectionProperties);
+    assertThat(kafkaConnectionProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
is("latest"));
+  }
+
+  @Test
+  public void sourceReadData() {
+    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(),
topic, "batch", 10, 10);
+    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(),
topic);
+    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(),
topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      Long endingOffset = endOffsets.get(entry.getKey());
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+    }
+
+    Configuration config = ClusterTest.getConf();
+
+    Pipeline pipeline = new MRPipeline(KafkaSourceIT.class, config);
+    pipeline.enableDebug();
+
+    ReadableSource<ConsumerRecord<BytesWritable, BytesWritable>> kafkaSource
= new KafkaSource(consumerProps, offsets);
+
+    PCollection<ConsumerRecord<BytesWritable, BytesWritable>> read = pipeline.read(kafkaSource);
+
+    Set<String> keysRead = new HashSet<>();
+    int numRecordsFound = 0;
+    String currentKey;
+    for (ConsumerRecord<BytesWritable, BytesWritable> record : read.materialize())
{
+      currentKey = new String(record.key().getBytes());
+      assertThat(keys, hasItem(currentKey));
+      numRecordsFound++;
+      keysRead.add(new String(record.key().getBytes()));
+    }
+
+    assertThat(numRecordsFound, is(keys.size()));
+    assertThat(keysRead.size(), is(keys.size()));
+
+    pipeline.done();
+  }
+
+
+  @Test
+  public void sourceReadDataThroughPipeline() {
+    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(),
topic, "batch", 10, 10);
+    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(),
topic);
+    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(),
topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      Long endingOffset = endOffsets.get(entry.getKey());
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+    }
+
+    Configuration config = ClusterTest.getConf();
+
+    Pipeline pipeline = new MRPipeline(KafkaSourceIT.class, config);
+    pipeline.enableDebug();
+
+    ReadableSource<ConsumerRecord<BytesWritable, BytesWritable>> kafkaSource
= new KafkaSource(consumerProps, offsets);
+
+    PCollection<ConsumerRecord<BytesWritable, BytesWritable>> read = pipeline.read(kafkaSource);
+    Path out = path.getPath("out");
+    read.parallelDo(new KafkaSourceIT.SimpleConvertFn(), Avros.strings()).write(To.textFile(out));
+
+    pipeline.run();
+
+    PCollection<String> persistedKeys = pipeline.read(From.textFile(out));
+
+    Set<String> keysRead = new HashSet<>();
+    int numRecordsFound = 0;
+    for (String value : persistedKeys.materialize()) {
+      assertThat(keys, hasItem(value));
+      numRecordsFound++;
+      keysRead.add(value);
+    }
+
+    assertThat(numRecordsFound, is(keys.size()));
+    assertThat(keysRead.size(), is(keys.size()));
+
+    pipeline.done();
+  }
+
+
+  private static class SimpleConvertFn extends MapFn<ConsumerRecord<BytesWritable,
BytesWritable>, String> {
+    @Override
+    public String map(ConsumerRecord<BytesWritable, BytesWritable> record) {
+      return new String(record.key().getBytes());
+    }
+  }
+}


Mime
View raw message