beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/3] beam git commit: [BEAM-1272] Align the naming of "generateInitialSplits" and "splitIntoBundles" to better reflect their intention
Date Tue, 18 Apr 2017 19:01:42 GMT
Repository: beam
Updated Branches:
  refs/heads/master 4124cc687 -> 3101e69c4


http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
index 8740204..7edda1a 100644
--- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
+++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
@@ -214,7 +214,7 @@ public class JmsIOTest {
     PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
     int desiredNumSplits = 5;
     JmsIO.UnboundedJmsSource initialSource = new JmsIO.UnboundedJmsSource(read);
-    List<JmsIO.UnboundedJmsSource> splits = initialSource.generateInitialSplits(desiredNumSplits,
+    List<JmsIO.UnboundedJmsSource> splits = initialSource.split(desiredNumSplits,
         pipelineOptions);
     // in the case of a queue, we have concurrent consumers by default, so the initial number
     // splits is equal to the desired number of splits
@@ -227,7 +227,7 @@ public class JmsIOTest {
     PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
     int desiredNumSplits = 5;
     JmsIO.UnboundedJmsSource initialSource = new JmsIO.UnboundedJmsSource(read);
-    List<JmsIO.UnboundedJmsSource> splits = initialSource.generateInitialSplits(desiredNumSplits,
+    List<JmsIO.UnboundedJmsSource> splits = initialSource.split(desiredNumSplits,
         pipelineOptions);
     // in the case of a topic, we can have only an unique subscriber on the topic per pipeline
     // else it means we can have duplicate messages (all subscribers on the topic receive
every

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 69d82bc..7feb8d0 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -153,7 +153,7 @@ import org.slf4j.LoggerFactory;
  * <h3>Partition Assignment and Checkpointing</h3>
  * The Kafka partitions are evenly distributed among splits (workers).
  * Checkpointing is fully supported and each split can resume from previous checkpoint. See
- * {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for more details
on
+ * {@link UnboundedKafkaSource#split(int, PipelineOptions)} for more details on
  * splits and checkpoint support.
  *
  * <p>When the pipeline starts for the first time without any checkpoint, the source
starts
@@ -311,7 +311,7 @@ public class KafkaIO {
 
     /**
      * Returns a new {@link Read} that reads from the topic.
-     * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description
+     * See {@link UnboundedKafkaSource#split(int, PipelineOptions)} for description
      * of how the partitions are distributed among the splits.
      */
     public Read<K, V> withTopic(String topic) {
@@ -321,7 +321,7 @@ public class KafkaIO {
     /**
      * Returns a new {@link Read} that reads from the topics. All the partitions from each
      * of the topics are read.
-     * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description
+     * See {@link UnboundedKafkaSource#split(int, PipelineOptions)} for description
      * of how the partitions are distributed among the splits.
      */
     public Read<K, V> withTopics(List<String> topics) {
@@ -333,7 +333,7 @@ public class KafkaIO {
     /**
      * Returns a new {@link Read} that reads from the partitions. This allows reading only
a subset
      * of partitions for one or more topics when (if ever) needed.
-     * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description
+     * See {@link UnboundedKafkaSource#split(int, PipelineOptions)} for description
      * of how the partitions are distributed among the splits.
      */
     public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions)
{
@@ -626,7 +626,7 @@ public class KafkaIO {
      * {@code <topic, partition>} and then assigned to splits in round-robin order.
      */
     @Override
-    public List<UnboundedKafkaSource<K, V>> generateInitialSplits(
+    public List<UnboundedKafkaSource<K, V>> split(
         int desiredNumSplits, PipelineOptions options) throws Exception {
 
       List<TopicPartition> partitions = new ArrayList<>(spec.getTopicPartitions());
@@ -698,7 +698,7 @@ public class KafkaIO {
         LOG.warn("Looks like generateSplits() is not called. Generate single split.");
         try {
           return new UnboundedKafkaReader<K, V>(
-              generateInitialSplits(1, options).get(0), checkpointMark);
+              split(1, options).get(0), checkpointMark);
         } catch (Exception e) {
           throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index ecbc71d..2b11162 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -389,7 +389,7 @@ public class KafkaIOTest {
     UnboundedSource<KafkaRecord<Integer, Long>, ?> initial =
         mkKafkaReadTransform(numElements, null).makeSource();
     List<? extends UnboundedSource<KafkaRecord<Integer, Long>, ?>> splits
=
-        initial.generateInitialSplits(numSplits, p.getOptions());
+        initial.split(numSplits, p.getOptions());
     assertEquals("Expected exact splitting", numSplits, splits.size());
 
     long elementsPerSplit = numElements / numSplits;
@@ -446,7 +446,7 @@ public class KafkaIOTest {
     UnboundedSource<KafkaRecord<Integer, Long>, KafkaCheckpointMark> source =
         mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
           .makeSource()
-          .generateInitialSplits(1, PipelineOptionsFactory.create())
+          .split(1, PipelineOptionsFactory.create())
           .get(0);
 
     UnboundedReader<KafkaRecord<Integer, Long>> reader = source.createReader(null,
null);
@@ -486,7 +486,7 @@ public class KafkaIOTest {
     UnboundedSource<KafkaRecord<Integer, Long>, KafkaCheckpointMark> source =
         mkKafkaReadTransform(initialNumElements, new ValueAsTimestampFn())
             .makeSource()
-            .generateInitialSplits(1, PipelineOptionsFactory.create())
+            .split(1, PipelineOptionsFactory.create())
             .get(0);
 
     UnboundedReader<KafkaRecord<Integer, Long>> reader = source.createReader(null,
null);
@@ -515,7 +515,7 @@ public class KafkaIOTest {
         .withMaxNumRecords(numElements)
         .withTimestampFn(new ValueAsTimestampFn())
         .makeSource()
-        .generateInitialSplits(1, PipelineOptionsFactory.create())
+        .split(1, PipelineOptionsFactory.create())
         .get(0);
 
     reader = source.createReader(null, mark);

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
index 45e0b51..7e67d07 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
@@ -56,7 +56,7 @@ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoi
      * {@code desiredNumSplits} partitions. Each partition is then a split.
      */
     @Override
-    public List<KinesisSource> generateInitialSplits(int desiredNumSplits,
+    public List<KinesisSource> split(int desiredNumSplits,
                                                      PipelineOptions options) throws Exception
{
         KinesisReaderCheckpoint checkpoint =
                 initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis));

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
index e193d29..940d875 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -381,8 +381,8 @@ public class MongoDbGridFSIO {
       }
 
       @Override
-      public List<? extends BoundedSource<ObjectId>> splitIntoBundles(long desiredBundleSizeBytes,
-          PipelineOptions options) throws Exception {
+      public List<? extends BoundedSource<ObjectId>> split(
+          long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
         Mongo mongo = spec.connectionConfiguration().setupMongo();
         try {
           GridFS gridfs = spec.connectionConfiguration().setupGridFS(mongo);

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 09b8505..2b7fb0a 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -225,7 +225,7 @@ public class MongoDbIO {
     }
 
     @Override
-    public List<BoundedSource<Document>> splitIntoBundles(long desiredBundleSizeBytes,
+    public List<BoundedSource<Document>> split(long desiredBundleSizeBytes,
                                                 PipelineOptions options) {
       MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()));
       MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database());

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
index 71718e3..8e7f03b 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
@@ -267,7 +267,7 @@ public class MongoDBGridFSIOTest implements Serializable {
 
     // make sure 2 files can fit in
     long desiredBundleSizeBytes = (src.getEstimatedSizeBytes(options) * 2L) / 5L + 1000;
-    List<? extends BoundedSource<ObjectId>> splits = src.splitIntoBundles(
+    List<? extends BoundedSource<ObjectId>> splits = src.split(
         desiredBundleSizeBytes, options);
 
     int expectedNbSplits = 3;

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index 820b265..1df445f 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -365,7 +365,7 @@ public class MqttIO {
     }
 
     @Override
-    public List<UnboundedMqttSource> generateInitialSplits(int desiredNumSplits,
+    public List<UnboundedMqttSource> split(int desiredNumSplits,
                                                            PipelineOptions options) {
       // MQTT is based on a pub/sub pattern
       // so, if we create several subscribers on the same topic, they all will receive the
same


Mime
View raw message