Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0DA00190D7 for ; Fri, 4 Mar 2016 18:11:07 +0000 (UTC) Received: (qmail 62667 invoked by uid 500); 4 Mar 2016 18:11:07 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 62623 invoked by uid 500); 4 Mar 2016 18:11:07 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 62612 invoked by uid 99); 4 Mar 2016 18:11:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2016 18:11:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 7C5CCC685A for ; Fri, 4 Mar 2016 18:11:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.211 X-Spam-Level: X-Spam-Status: No, score=-3.211 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, T_FILL_THIS_FORM_SHORT=0.01] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id MP5LTOEkzJk2 for ; Fri, 4 Mar 2016 18:11:04 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 9819A5FC6C for ; Fri, 4 Mar 2016 18:10:59 +0000 (UTC) Received: (qmail 61086 invoked by uid 99); 4 Mar 2016 18:10:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2016 18:10:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 68CFEE7884; Fri, 4 Mar 2016 18:10:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davor@apache.org To: commits@beam.incubator.apache.org Date: Fri, 04 Mar 2016 18:11:34 -0000 Message-Id: <4cadbd58b7114c9daa2ce03bfa6aa468@git.apache.org> In-Reply-To: <844cc05de9854f8599738d83d11f8ce1@git.apache.org> References: <844cc05de9854f8599738d83d11f8ce1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [39/50] [abbrv] incubator-beam git commit: [flink] convert tabs to 2 spaces http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java index ad5b53a..90073c1 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java @@ -40,93 +40,93 @@ import java.util.Arrays; * Session window test */ public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable { - protected String resultPath; - - public TopWikipediaSessionsITCase(){ - } - - static final String[] EXPECTED_RESULT = new String[] { - "user: user1 value:3", - "user: user1 value:1", - "user: user2 value:4", - "user: user2 value:6", - "user: user3 value:7", - "user: user3 value:2" - }; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - Pipeline p = FlinkTestPipeline.createForStreaming(); - - Long now = (System.currentTimeMillis() + 10000) / 1000; - - PCollection> output = - p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set - ("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now).set - ("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set - ("contributor_username", "user1"), new TableRow().set("timestamp", now).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set - ("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now) - .set("contributor_username", "user3")))) - - - - .apply(ParDo.of(new DoFn() { - @Override - public void processElement(ProcessContext c) throws Exception { - TableRow row = c.element(); - long timestamp = (Integer) row.get("timestamp"); - String userName = (String) row.get("contributor_username"); - if (userName != null) { - // Sets the timestamp field to be used in windowing. - c.outputWithTimestamp(userName, new Instant(timestamp * 1000L)); - } - } - })) - - .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))) - - .apply(Count.perElement()); - - PCollection format = output.apply(ParDo.of(new DoFn, String>() { - @Override - public void processElement(ProcessContext c) throws Exception { - KV el = c.element(); - String out = "user: " + el.getKey() + " value:" + el.getValue(); - c.output(out); - } - })); - - format.apply(TextIO.Write.to(resultPath)); - - p.run(); - } + protected String resultPath; + + public TopWikipediaSessionsITCase(){ + } + + static final String[] EXPECTED_RESULT = new String[] { + "user: user1 value:3", + "user: user1 value:1", + "user: user2 value:4", + "user: user2 value:6", + "user: user3 value:7", + "user: user3 value:2" + }; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + @Override + protected void testProgram() throws Exception { + + Pipeline p = FlinkTestPipeline.createForStreaming(); + + Long now = (System.currentTimeMillis() + 10000) / 1000; + + PCollection> output = + p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now) + .set("contributor_username", "user3")))) + + + + .apply(ParDo.of(new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + TableRow row = c.element(); + long timestamp = (Integer) row.get("timestamp"); + String userName = (String) row.get("contributor_username"); + if (userName != null) { + // Sets the timestamp field to be used in windowing. + c.outputWithTimestamp(userName, new Instant(timestamp * 1000L)); + } + } + })) + + .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))) + + .apply(Count.perElement()); + + PCollection format = output.apply(ParDo.of(new DoFn, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + KV el = c.element(); + String out = "user: " + el.getKey() + " value:" + el.getValue(); + c.output(out); + } + })); + + format.apply(TextIO.Write.to(resultPath)); + + p.run(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java index aa5623d..b1ccee4 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java @@ -38,121 +38,121 @@ import com.google.cloud.dataflow.sdk.values.TupleTag; */ public class JoinExamples { - // A 1000-row sample of the GDELT data here: gdelt-bq:full.events. - private static final String GDELT_EVENTS_TABLE = - "clouddataflow-readonly:samples.gdelt_sample"; - // A table that maps country codes to country names. - private static final String COUNTRY_CODES = - "gdelt-bq:full.crosswalk_geocountrycodetohuman"; - - /** - * Join two collections, using country code as the key. - */ - public static PCollection joinEvents(PCollection eventsTable, - PCollection countryCodes) throws Exception { - - final TupleTag eventInfoTag = new TupleTag<>(); - final TupleTag countryInfoTag = new TupleTag<>(); - - // transform both input collections to tuple collections, where the keys are country - // codes in both cases. - PCollection> eventInfo = eventsTable.apply( - ParDo.of(new ExtractEventDataFn())); - PCollection> countryInfo = countryCodes.apply( - ParDo.of(new ExtractCountryInfoFn())); - - // country code 'key' -> CGBKR (, ) - PCollection> kvpCollection = KeyedPCollectionTuple - .of(eventInfoTag, eventInfo) - .and(countryInfoTag, countryInfo) - .apply(CoGroupByKey.create()); - - // Process the CoGbkResult elements generated by the CoGroupByKey transform. - // country code 'key' -> string of , - PCollection> finalResultCollection = - kvpCollection.apply(ParDo.of(new DoFn, KV>() { - @Override - public void processElement(ProcessContext c) { - KV e = c.element(); - CoGbkResult val = e.getValue(); - String countryCode = e.getKey(); - String countryName; - countryName = e.getValue().getOnly(countryInfoTag, "Kostas"); - for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) { - // Generate a string that combines information from both collection values - c.output(KV.of(countryCode, "Country name: " + countryName - + ", Event info: " + eventInfo)); - } - } - })); - - // write to GCS - return finalResultCollection - .apply(ParDo.of(new DoFn, String>() { - @Override - public void processElement(ProcessContext c) { - String outputstring = "Country code: " + c.element().getKey() - + ", " + c.element().getValue(); - c.output(outputstring); - } - })); - } - - /** - * Examines each row (event) in the input table. Output a KV with the key the country - * code of the event, and the value a string encoding event information. - */ - static class ExtractEventDataFn extends DoFn> { - @Override - public void processElement(ProcessContext c) { - TableRow row = c.element(); - String countryCode = (String) row.get("ActionGeo_CountryCode"); - String sqlDate = (String) row.get("SQLDATE"); - String actor1Name = (String) row.get("Actor1Name"); - String sourceUrl = (String) row.get("SOURCEURL"); - String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl; - c.output(KV.of(countryCode, eventInfo)); - } - } - - - /** - * Examines each row (country info) in the input table. Output a KV with the key the country - * code, and the value the country name. - */ - static class ExtractCountryInfoFn extends DoFn> { - @Override - public void processElement(ProcessContext c) { - TableRow row = c.element(); - String countryCode = (String) row.get("FIPSCC"); - String countryName = (String) row.get("HumanName"); - c.output(KV.of(countryCode, countryName)); - } - } - - - /** - * Options supported by {@link JoinExamples}. - *

- * Inherits standard configuration options. - */ - private interface Options extends PipelineOptions { - @Description("Path of the file to write to") - @Validation.Required - String getOutput(); - void setOutput(String value); - } - - public static void main(String[] args) throws Exception { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - Pipeline p = Pipeline.create(options); - // the following two 'applys' create multiple inputs to our pipeline, one for each - // of our two input sources. - PCollection eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE)); - PCollection countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES)); - PCollection formattedResults = joinEvents(eventsTable, countryCodes); - formattedResults.apply(TextIO.Write.to(options.getOutput())); - p.run(); - } + // A 1000-row sample of the GDELT data here: gdelt-bq:full.events. + private static final String GDELT_EVENTS_TABLE = + "clouddataflow-readonly:samples.gdelt_sample"; + // A table that maps country codes to country names. + private static final String COUNTRY_CODES = + "gdelt-bq:full.crosswalk_geocountrycodetohuman"; + + /** + * Join two collections, using country code as the key. + */ + public static PCollection joinEvents(PCollection eventsTable, + PCollection countryCodes) throws Exception { + + final TupleTag eventInfoTag = new TupleTag<>(); + final TupleTag countryInfoTag = new TupleTag<>(); + + // transform both input collections to tuple collections, where the keys are country + // codes in both cases. + PCollection> eventInfo = eventsTable.apply( + ParDo.of(new ExtractEventDataFn())); + PCollection> countryInfo = countryCodes.apply( + ParDo.of(new ExtractCountryInfoFn())); + + // country code 'key' -> CGBKR (, ) + PCollection> kvpCollection = KeyedPCollectionTuple + .of(eventInfoTag, eventInfo) + .and(countryInfoTag, countryInfo) + .apply(CoGroupByKey.create()); + + // Process the CoGbkResult elements generated by the CoGroupByKey transform. + // country code 'key' -> string of , + PCollection> finalResultCollection = + kvpCollection.apply(ParDo.of(new DoFn, KV>() { + @Override + public void processElement(ProcessContext c) { + KV e = c.element(); + CoGbkResult val = e.getValue(); + String countryCode = e.getKey(); + String countryName; + countryName = e.getValue().getOnly(countryInfoTag, "Kostas"); + for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) { + // Generate a string that combines information from both collection values + c.output(KV.of(countryCode, "Country name: " + countryName + + ", Event info: " + eventInfo)); + } + } + })); + + // write to GCS + return finalResultCollection + .apply(ParDo.of(new DoFn, String>() { + @Override + public void processElement(ProcessContext c) { + String outputstring = "Country code: " + c.element().getKey() + + ", " + c.element().getValue(); + c.output(outputstring); + } + })); + } + + /** + * Examines each row (event) in the input table. Output a KV with the key the country + * code of the event, and the value a string encoding event information. + */ + static class ExtractEventDataFn extends DoFn> { + @Override + public void processElement(ProcessContext c) { + TableRow row = c.element(); + String countryCode = (String) row.get("ActionGeo_CountryCode"); + String sqlDate = (String) row.get("SQLDATE"); + String actor1Name = (String) row.get("Actor1Name"); + String sourceUrl = (String) row.get("SOURCEURL"); + String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl; + c.output(KV.of(countryCode, eventInfo)); + } + } + + + /** + * Examines each row (country info) in the input table. Output a KV with the key the country + * code, and the value the country name. + */ + static class ExtractCountryInfoFn extends DoFn> { + @Override + public void processElement(ProcessContext c) { + TableRow row = c.element(); + String countryCode = (String) row.get("FIPSCC"); + String countryName = (String) row.get("HumanName"); + c.output(KV.of(countryCode, countryName)); + } + } + + + /** + * Options supported by {@link JoinExamples}. + *

+ * Inherits standard configuration options. + */ + private interface Options extends PipelineOptions { + @Description("Path of the file to write to") + @Validation.Required + String getOutput(); + void setOutput(String value); + } + + public static void main(String[] args) throws Exception { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + // the following two 'applys' create multiple inputs to our pipeline, one for each + // of our two input sources. + PCollection eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE)); + PCollection countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES)); + PCollection formattedResults = joinEvents(eventsTable, countryCodes); + formattedResults.apply(TextIO.Write.to(options.getOutput())); + p.run(); + } }