beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/4] beam-site git commit: [BEAM-1353] Style Guide fixups
Date Mon, 15 May 2017 19:16:31 GMT
Repository: beam-site
Updated Branches:
  refs/heads/asf-site 9cc5b2280 -> 9ffe5ec58


[BEAM-1353] Style Guide fixups

Fixes usages of PTransforms affected by changes as part of
https://issues.apache.org/jira/browse/BEAM-1353


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/e78f8f27
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/e78f8f27
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/e78f8f27

Branch: refs/heads/asf-site
Commit: e78f8f276a6cbf3156e0f7af3dd4ae1d9b92ee7a
Parents: 0d0da02
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Fri May 12 16:07:19 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Mon May 15 11:28:52 2017 -0700

----------------------------------------------------------------------
 .../pipelines/create-your-pipeline.md           |  4 +-
 .../pipelines/design-your-pipeline.md           | 20 ++++----
 src/documentation/programming-guide.md          | 54 ++++++++++----------
 src/documentation/sdks/java-extensions.md       |  2 +-
 src/get-started/mobile-gaming-example.md        | 20 ++++----
 src/get-started/wordcount-example.md            |  6 +--
 6 files changed, 53 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam-site/blob/e78f8f27/src/documentation/pipelines/create-your-pipeline.md
----------------------------------------------------------------------
diff --git a/src/documentation/pipelines/create-your-pipeline.md b/src/documentation/pipelines/create-your-pipeline.md
index b765467..cbf7d31 100644
--- a/src/documentation/pipelines/create-your-pipeline.md
+++ b/src/documentation/pipelines/create-your-pipeline.md
@@ -42,7 +42,7 @@ The following example code shows how to `apply` a `TextIO.Read` root transform
t
 
 ```java
 PCollection<String> lines = p.apply(
-  "ReadLines", TextIO.Read.from("gs://some/inputData.txt"));
+  "ReadLines", TextIO.read().from("gs://some/inputData.txt"));
 ```
 
 ## Applying Transforms to Process Pipeline Data
@@ -68,7 +68,7 @@ The following example code shows how to `apply` a `TextIO.Write` transform
to wr
 ```java
 PCollection<String> filteredWords = ...;
 
-filteredWords.apply("WriteMyFile", TextIO.Write.to("gs://some/outputData.txt"));
+filteredWords.apply("WriteMyFile", TextIO.write().to("gs://some/outputData.txt"));
 ```
 
 ## Running Your Pipeline

http://git-wip-us.apache.org/repos/asf/beam-site/blob/e78f8f27/src/documentation/pipelines/design-your-pipeline.md
----------------------------------------------------------------------
diff --git a/src/documentation/pipelines/design-your-pipeline.md b/src/documentation/pipelines/design-your-pipeline.md
index c40803c..ce6a734 100644
--- a/src/documentation/pipelines/design-your-pipeline.md
+++ b/src/documentation/pipelines/design-your-pipeline.md
@@ -103,13 +103,7 @@ final TupleTag<String> startsWithATag = new TupleTag<String>(){};
 final TupleTag<String> startsWithBTag = new TupleTag<String>(){};
 
 PCollectionTuple mixedCollection =
-    dbRowCollection.apply(
-        ParDo
-        // Specify main output. In this example, it is the output
-        // with tag startsWithATag.
-        .withOutputTags(startsWithATag,
-        // Specify the output with tag startsWithBTag, as a TupleTagList.
-                        TupleTagList.of(startsWithBTag))
+    dbRowCollection.apply(ParDo
         .of(new DoFn<String, String>() {
           @ProcessElement
           public void processElement(ProcessContext c) {
@@ -121,8 +115,12 @@ PCollectionTuple mixedCollection =
               c.output(startsWithBTag, c.element());
             }
           }
-        }
-        ));
+        })
+        // Specify main output. In this example, it is the output
+        // with tag startsWithATag.
+        .withOutputTags(startsWithATag,
+        // Specify the output with tag startsWithBTag, as a TupleTagList.
+                        TupleTagList.of(startsWithBTag)));
 
 // Get subset of the output with tag startsWithATag.
 mixedCollection.get(startsWithATag).apply(...);
@@ -159,7 +157,7 @@ mergedCollectionWithFlatten.apply(...);
 
 ## Multiple sources
 
-Your pipeline can read its input from one or more sources. If your pipeline reads from multiple
sources and the data from those sources is related, it can be useful to join the inputs together.
In the example illustrated in Figure 5 below, the pipeline reads names and addresses from
a database table, and names and order numbers from a text file. The pipeline then uses `CoGroupByKey`
to join this information, where the key is the name; the resulting `PCollection` contains
all the combinations of names, addresses, and orders.
+Your pipeline can read its input from one or more sources. If your pipeline reads from multiple
sources and the data from those sources is related, it can be useful to join the inputs together.
In the example illustrated in Figure 5 below, the pipeline reads names and addresses from
a database table, and names and order numbers from a Kafka topic. The pipeline then uses `CoGroupByKey`
to join this information, where the key is the name; the resulting `PCollection` contains
all the combinations of names, addresses, and orders.
 
 <figure id="fig5">
     <img src="{{ site.baseurl }}/images/design-your-pipeline-join.png"
@@ -169,7 +167,7 @@ Figure 5: A pipeline with multiple input sources. See the example code
below:
 ```java
 PCollection<KV<String, String>> userAddress = pipeline.apply(JdbcIO.<KV<String,
String>>read()...);
 
-PCollection<KV<String, String>> userOrder = pipeline.apply(TextIO.<KV<String,
String>>read()...);
+PCollection<KV<String, String>> userOrder = pipeline.apply(KafkaIO.<String,
String>read()...);
 
 final TupleTag<String> addressTag = new TupleTag<String>();
 final TupleTag<String> orderTag = new TupleTag<String>();

http://git-wip-us.apache.org/repos/asf/beam-site/blob/e78f8f27/src/documentation/programming-guide.md
----------------------------------------------------------------------
diff --git a/src/documentation/programming-guide.md b/src/documentation/programming-guide.md
index f70e255..d7e37a0 100644
--- a/src/documentation/programming-guide.md
+++ b/src/documentation/programming-guide.md
@@ -191,7 +191,7 @@ public static void main(String[] args) {
 
     // Create the PCollection 'lines' by applying a 'Read' transform.
     PCollection<String> lines = p.apply(
-      "ReadMyFile", TextIO.Read.from("protocol://path/to/some/inputData.txt"));
+      "ReadMyFile", TextIO.read().from("protocol://path/to/some/inputData.txt"));
 }
 ```
 
@@ -479,8 +479,8 @@ PCollection<String> words = ...;
 // Apply a MapElements with an anonymous lambda function to the PCollection words.
 // Save the result as the PCollection wordLengths.
 PCollection<Integer> wordLengths = words.apply(
-  MapElements.via((String word) -> word.length())
-      .withOutputType(new TypeDescriptor<Integer>() {});
+  MapElements.into(TypeDescriptors.integers())
+             .via((String word) -> word.length()));
 ```
 
 ```py
@@ -862,16 +862,18 @@ Side inputs are useful if your `ParDo` needs to inject additional data
when proc
 
   // Apply a ParDo that takes maxWordLengthCutOffView as a side input.
   PCollection<String> wordsBelowCutOff =
-  words.apply(ParDo.withSideInputs(maxWordLengthCutOffView)
-                    .of(new DoFn<String, String>() {
-      public void processElement(ProcessContext c) {
-        String word = c.element();
-        // In our DoFn, access the side input.
-        int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
-        if (word.length() <= lengthCutOff) {
-          c.output(word);
-        }
-  }}));
+  words.apply(ParDo
+      .of(new DoFn<String, String>() {
+          public void processElement(ProcessContext c) {
+            String word = c.element();
+            // In our DoFn, access the side input.
+            int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
+            if (word.length() <= lengthCutOff) {
+              c.output(word);
+            }
+          }
+      }).withSideInputs(maxWordLengthCutOffView)
+  );
 ```
 
 ```py
@@ -943,17 +945,16 @@ While `ParDo` always produces a main output `PCollection` (as the return
value f
 // to our ParDo. Note that all of the outputs (including the main output PCollection) are
bundled into the returned PCollectionTuple.
 
   PCollectionTuple results =
-      words.apply(
-          ParDo
+      words.apply(ParDo
+          .of(new DoFn<String, String>() {
+            // DoFn continues here.
+            ...
+          })
           // Specify the tag for the main output.
           .withOutputTags(wordsBelowCutOffTag,
           // Specify the tags for the two additional outputs as a TupleTagList.
                           TupleTagList.of(wordLengthsAboveCutOffTag)
-                                      .and(markedWordsTag))
-          .of(new DoFn<String, String>() {
-            // DoFn continues here.
-            ...
-          }
+                                      .and(markedWordsTag)));
 ```
 
 ```py
@@ -1114,7 +1115,7 @@ Read transforms read data from an external source and return a `PCollection`
rep
 #### Using a read transform:
 
 ```java
-PCollection<String> lines = p.apply(TextIO.Read.from("gs://some/inputData.txt"));
+PCollection<String> lines = p.apply(TextIO.read().from("gs://some/inputData.txt"));
 ```
 
 ```py
@@ -1128,7 +1129,7 @@ Write transforms write the data in a `PCollection` to an external data
source. Y
 #### Using a Write transform:
 
 ```java
-output.apply(TextIO.Write.to("gs://some/outputData"));
+output.apply(TextIO.write().to("gs://some/outputData"));
 ```
 
 ```py
@@ -1143,7 +1144,7 @@ Many read transforms support reading from multiple input files matching
a glob o
 
 ```java
 p.apply(“ReadFromText”,
-    TextIO.Read.from("protocol://my_bucket/path/to/input-*.csv");
+    TextIO.read().from("protocol://my_bucket/path/to/input-*.csv");
 ```
 
 ```py
@@ -1161,7 +1162,7 @@ The following write transform example writes multiple output files to
a location
 
 ```java
 records.apply("WriteToText",
-    TextIO.Write.to("protocol://my_bucket/path/to/numbers")
+    TextIO.write().to("protocol://my_bucket/path/to/numbers")
                 .withSuffix(".csv"));
 ```
 
@@ -1563,7 +1564,7 @@ You can allow late data by invoking the `.withAllowedLateness` operation
when yo
               .withAllowedLateness(Duration.standardDays(2)));
 ```
 
-When you set `.withAllowedLateness` on a `PCollection`, that allowed lateness propagates
forward to any subsequent `PCollection` derived from the first `PCollection` you applied allowed
lateness to. If you want to change the allowed lateness later in your pipeline, you must do
so explictly by applying `Window.withAllowedLateness()` again.
+When you set `.withAllowedLateness` on a `PCollection`, that allowed lateness propagates
forward to any subsequent `PCollection` derived from the first `PCollection` you applied allowed
lateness to. If you want to change the allowed lateness later in your pipeline, you must do
so explictly by applying `Window.configure().withAllowedLateness()`.
 
 
 ### Adding timestamps to a PCollection's elements
@@ -1737,7 +1738,7 @@ You set the allowed lateness by using `.withAllowedLateness()` when
you set your
   # The Beam SDK for Python does not support triggers.
 ```
 
-This allowed lateness propagates to all `PCollection`s derived as a result of applying transforms
to the original `PCollection`. If you want to change the allowed lateness later in your pipeline,
you can apply `Window.withAllowedLateness()` again, explicitly.
+This allowed lateness propagates to all `PCollection`s derived as a result of applying transforms
to the original `PCollection`. If you want to change the allowed lateness later in your pipeline,
you can apply `Window.configure().withAllowedLateness()` again, explicitly.
 
 
 ### <a name="composite-triggers"></a>Composite Triggers
@@ -1770,6 +1771,7 @@ You can express this pattern using `AfterWatermark.pastEndOfWindow`.
For example
 
 ```java
   .apply(Window
+      .configure()
       .triggering(AfterWatermark
            .pastEndOfWindow()
            .withLateFirings(AfterProcessingTime

http://git-wip-us.apache.org/repos/asf/beam-site/blob/e78f8f27/src/documentation/sdks/java-extensions.md
----------------------------------------------------------------------
diff --git a/src/documentation/sdks/java-extensions.md b/src/documentation/sdks/java-extensions.md
index a4694af..17a79e7 100644
--- a/src/documentation/sdks/java-extensions.md
+++ b/src/documentation/sdks/java-extensions.md
@@ -55,5 +55,5 @@ PCollection<KV<String, Iterable<KV<String, Integer>>>>
grouped =
 // For every primary key, sort the iterable of <SecondaryKey, Value> pairs by secondary
key.
 PCollection<KV<String, Iterable<KV<String, Integer>>>> groupedAndSorted
=
     grouped.apply(
-        SortValues.<String, String, Integer>create(new BufferedExternalSorter.Options()));
+        SortValues.<String, String, Integer>create(BufferedExternalSorter.options()));
 ```

http://git-wip-us.apache.org/repos/asf/beam-site/blob/e78f8f27/src/get-started/mobile-gaming-example.md
----------------------------------------------------------------------
diff --git a/src/get-started/mobile-gaming-example.md b/src/get-started/mobile-gaming-example.md
index 5c97274..9e59274 100644
--- a/src/get-started/mobile-gaming-example.md
+++ b/src/get-started/mobile-gaming-example.md
@@ -107,9 +107,9 @@ public static class ExtractAndSumScore
 
     return gameInfo
       .apply(MapElements
-          .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))
-          .withOutputType(
-              TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())))
+          .into(
+              TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
+          .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore())))
       .apply(Sum.<String>integersPerKey());
   }
 }
@@ -148,7 +148,7 @@ public static void main(String[] args) throws Exception {
   Pipeline pipeline = Pipeline.create(options);
 
   // Read events from a text file and parse them.
-  pipeline.apply(TextIO.Read.from(options.getInput()))
+  pipeline.apply(TextIO.read().from(options.getInput()))
     .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
     // Extract and sum username/score pairs from the event data.
     .apply("ExtractUserScore", new ExtractAndSumScore("user"))
@@ -314,7 +314,7 @@ public static void main(String[] args) throws Exception {
   final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin()));
 
   // Read 'gaming' events from a text file.
-  pipeline.apply(TextIO.Read.from(options.getInput()))
+  pipeline.apply(TextIO.read().from(options.getInput()))
     // Parse the incoming data.
     .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
 
@@ -601,8 +601,6 @@ public static class CalculateSpammyUsers
     // Filter the user sums using the global mean.
     PCollection<KV<String, Integer>> filtered = sumScores
         .apply("ProcessAndFilter", ParDo
-            // use the derived mean total score as a side input
-            .withSideInputs(globalMeanScore)
             .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
               private final Aggregator<Long, Long> numSpammerUsers =
                 createAggregator("SpammerUsers", new Sum.SumLongFn());
@@ -617,7 +615,9 @@ public static class CalculateSpammyUsers
                   c.output(c.element());
                 }
               }
-            }));
+            })
+            // use the derived mean total score as a side input
+            .withSideInputs(globalMeanScore));
     return filtered;
   }
 }
@@ -635,7 +635,6 @@ rawEvents
       FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
   // Filter out the detected spammer users, using the side input derived above.
   .apply("FilterOutSpammers", ParDo
-          .withSideInputs(spammersView)
           .of(new DoFn<GameActionInfo, GameActionInfo>() {
             @ProcessElement
             public void processElement(ProcessContext c) {
@@ -644,7 +643,8 @@ rawEvents
                 c.output(c.element());
               }
             }
-          }))
+          })
+          .withSideInputs(spammersView))
   // Extract and sum teamname/score pairs from the event data.
   .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
 ```

http://git-wip-us.apache.org/repos/asf/beam-site/blob/e78f8f27/src/get-started/wordcount-example.md
----------------------------------------------------------------------
diff --git a/src/get-started/wordcount-example.md b/src/get-started/wordcount-example.md
index 503f930..023086d 100644
--- a/src/get-started/wordcount-example.md
+++ b/src/get-started/wordcount-example.md
@@ -96,7 +96,7 @@ The Minimal WordCount pipeline contains five transforms:
 1.  A text file `Read` transform is applied to the Pipeline object itself, and produces a
`PCollection` as output. Each element in the output PCollection represents one line of text
from the input file. This example uses input data stored in a publicly accessible Google Cloud
Storage bucket ("gs://").
 
     ```java
-    p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
+    p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
     ```
 
     ```py
@@ -157,7 +157,7 @@ The Minimal WordCount pipeline contains five transforms:
 5.  A text file write transform. This transform takes the final `PCollection` of formatted
Strings as input and writes each element to an output text file. Each element in the input
`PCollection` represents one line of text in the resulting output file.
 
     ```java
-    .apply(TextIO.Write.to("wordcounts"));
+    .apply(TextIO.write().to("wordcounts"));
     ```
 
     ```py
@@ -398,7 +398,7 @@ public static void main(String[] args) throws IOException {
     Pipeline pipeline = Pipeline.create(options);
 
     PCollection<String> input = pipeline
-      .apply(TextIO.Read.from(options.getInputFile()))
+      .apply(TextIO.read().from(options.getInputFile()))
 
 ```
 


Mime
View raw message