Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A0876200C55 for ; Thu, 30 Mar 2017 01:45:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9F192160B95; Wed, 29 Mar 2017 23:45:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 19FD9160B8A for ; Thu, 30 Mar 2017 01:45:04 +0200 (CEST) Received: (qmail 74050 invoked by uid 500); 29 Mar 2017 23:45:04 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 74036 invoked by uid 99); 29 Mar 2017 23:45:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Mar 2017 23:45:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0ACE1DFF0F; Wed, 29 Mar 2017 23:45:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: altay@apache.org To: commits@beam.apache.org Date: Wed, 29 Mar 2017 23:45:03 -0000 Message-Id: <1c28b1ffca5f4ce791d5871e792458cf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: Add first two mobile gaming examples to Python. archived-at: Wed, 29 Mar 2017 23:45:06 -0000 Repository: beam Updated Branches: refs/heads/master 5d460d2e9 -> 4d633bc5a Add first two mobile gaming examples to Python. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d43391da Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d43391da Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d43391da Branch: refs/heads/master Commit: d43391da7ed778baee0b03fdd5e1171c6516d4f5 Parents: 5d460d2 Author: Ahmet Altay Authored: Tue Mar 28 17:45:11 2017 -0700 Committer: Ahmet Altay Committed: Wed Mar 29 16:44:51 2017 -0700 ---------------------------------------------------------------------- .../examples/complete/game/README.md | 69 +++++ .../examples/complete/game/__init__.py | 16 + .../examples/complete/game/hourly_team_score.py | 294 +++++++++++++++++++ .../complete/game/hourly_team_score_test.py | 52 ++++ .../examples/complete/game/user_score.py | 219 ++++++++++++++ .../examples/complete/game/user_score_test.py | 49 ++++ 6 files changed, 699 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d43391da/sdks/python/apache_beam/examples/complete/game/README.md ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/game/README.md b/sdks/python/apache_beam/examples/complete/game/README.md new file mode 100644 index 0000000..39677e4 --- /dev/null +++ b/sdks/python/apache_beam/examples/complete/game/README.md @@ -0,0 +1,69 @@ + +# 'Gaming' examples + +This directory holds a series of example Dataflow pipelines in a simple 'mobile +gaming' domain. Each pipeline successively introduces new concepts. + +In the gaming scenario, many users play, as members of different teams, over +the course of a day, and their actions are logged for processing. Some of the +logged game events may be late-arriving, if users play on mobile devices and go +transiently offline for a period. + +The scenario includes not only "regular" users, but "robot users", which have a +higher click rate than the regular users, and may move from team to team. + +The first two pipelines in the series use pre-generated batch data samples. + +All of these pipelines write their results to Google BigQuery table(s). + +## The pipelines in the 'gaming' series + +### user_score + +The first pipeline in the series is `user_score`. This pipeline does batch +processing of data collected from gaming events. It calculates the sum of +scores per user, over an entire batch of gaming data (collected, say, for each +day). The batch processing will not include any late data that arrives after +the day's cutoff point. + +### hourly_team_score + +The next pipeline in the series is `hourly_team_score`. This pipeline also +processes data collected from gaming events in batch. It builds on `user_score`, +but uses [fixed windows](https://beam.apache.org/documentation/programming-guide/#windowing), +by default an hour in duration. It calculates the sum of scores per team, for +each window, optionally allowing specification of two timestamps before and +after which data is filtered out. This allows a model where late data collected +after the intended analysis window can be included in the analysis, and any +late-arriving data prior to the beginning of the analysis window can be removed +as well. + +By using windowing and adding element timestamps, we can do finer-grained +analysis than with the `UserScore` pipeline — we're now tracking scores for +each hour rather than over the course of a whole day. However, our batch +processing is high-latency, in that we don't get results from plays at the +beginning of the batch's time period until the complete batch is processed. + +## Viewing the results in BigQuery + +All of the pipelines write their results to BigQuery. `user_score` and +`hourly_team_score` each write one table. The pipelines have default table names +that you can override when you start up the pipeline if those tables already +exist. http://git-wip-us.apache.org/repos/asf/beam/blob/d43391da/sdks/python/apache_beam/examples/complete/game/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/game/__init__.py b/sdks/python/apache_beam/examples/complete/game/__init__.py new file mode 100644 index 0000000..cce3aca --- /dev/null +++ b/sdks/python/apache_beam/examples/complete/game/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# http://git-wip-us.apache.org/repos/asf/beam/blob/d43391da/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py new file mode 100644 index 0000000..6ddf014 --- /dev/null +++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py @@ -0,0 +1,294 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Second in a series of four pipelines that tell a story in a 'gaming' domain. + +In addition to the concepts introduced in `user_score`, new concepts include: +windowing and element timestamps; use of `Filter`. + +This pipeline processes data collected from gaming events in batch, building on +`user_score` but using fixed windows. It calculates the sum of scores per team, +for each window, optionally allowing specification of two timestamps before and +after which data is filtered out. This allows a model where late data collected +after the intended analysis window can be included, and any late-arriving data +prior to the beginning of the analysis window can be removed as well. By using +windowing and adding element timestamps, we can do finer-grained analysis than +with the `user_score` pipeline. However, our batch processing is high-latency, +in that we don't get results from plays at the beginning of the batch's time +period until the batch is processed. + +To execute this pipeline using the static example input data, specify the +`--dataset=YOUR-DATASET` flag along with other runner specific flags. (Note: +BigQuery dataset you specify must already exist.) + +Optionally include the `--input` argument to specify a batch input file. To +indicate a time after which the data should be filtered out, include the +`--stop_min` arg. E.g., `--stop_min=2015-10-18-23-59` indicates that any data +timestamped after 23:59 PST on 2015-10-18 should not be included in the +analysis. To indicate a time before which data should be filtered out, include +the `--start_min` arg. If you're using the default input +"gs://dataflow-samples/game/gaming_data*.csv", then +`--start_min=2015-11-16-16-10 --stop_min=2015-11-17-16-10` are good values. +""" + +from __future__ import absolute_import + +import argparse +import datetime +import logging + +import apache_beam as beam +from apache_beam import typehints +from apache_beam.io import ReadFromText +from apache_beam.metrics import Metrics +from apache_beam.transforms.window import FixedWindows +from apache_beam.transforms.window import TimestampedValue +from apache_beam.typehints import with_input_types +from apache_beam.typehints import with_output_types +from apache_beam.utils.pipeline_options import GoogleCloudOptions +from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.utils.pipeline_options import SetupOptions + + +class ParseEventFn(beam.DoFn): + """Parses the raw game event info into GameActionInfo tuples. + + Each event line has the following format: + username,teamname,score,timestamp_in_ms,readable_time + + e.g.: + user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224 + + The human-readable time string is not used here. + """ + def __init__(self): + super(ParseEventFn, self).__init__() + self.num_parse_errors = Metrics.counter(self.__class__, 'num_parse_errors') + + def process(self, element): + components = element.split(',') + try: + user = components[0].strip() + team = components[1].strip() + score = int(components[2].strip()) + timestamp = int(components[3].strip()) + yield {'user': user, 'team': team, 'score': score, 'timestamp': timestamp} + except: # pylint: disable=bare-except + # Log and count parse errors. + self.num_parse_errors.inc() + logging.info('Parse error on %s.', element) + + +@with_input_types(ints=typehints.Iterable[int]) +@with_output_types(int) +def sum_ints(ints): + return sum(ints) + + +class ExtractAndSumScore(beam.PTransform): + """A transform to extract key/score information and sum the scores. + + The constructor argument `field` determines whether 'team' or 'user' info is + extracted. + """ + def __init__(self, field): + super(ExtractAndSumScore, self).__init__() + self.field = field + + def expand(self, pcoll): + return (pcoll + | beam.Map(lambda info: (info[self.field], info['score'])) + | beam.CombinePerKey(sum_ints)) + + +def configure_bigquery_write(): + + def window_start_format(element, window): + dt = datetime.datetime.fromtimestamp(int(window.start)) + return dt.strftime('%Y-%m-%d %H:%M:%S') + + return [ + ('team', 'STRING', lambda e, w: e[0]), + ('total_score', 'INTEGER', lambda e, w: e[1]), + ('window_start', 'STRING', window_start_format), + ] + + +class WriteWindowedToBigQuery(beam.PTransform): + """Generate, format, and write BigQuery table row information. + + This class may be used for writes that require access to the window + information. + """ + def __init__(self, table_name, dataset, field_info): + """Initializes the transform. + + Args: + table_name: Name of the BigQuery table to use. + dataset: Name of the dataset to use. + field_info: List of tuples that holds information about output table field + definitions. The tuples are in the + (field_name, field_type, field_fn) format, where field_name is + the name of the field, field_type is the BigQuery type of the + field and field_fn is a lambda function to generate the field + value from the element. + """ + super(WriteWindowedToBigQuery, self).__init__() + self.table_name = table_name + self.dataset = dataset + self.field_info = field_info + + def get_schema(self): + """Build the output table schema.""" + return ', '.join( + '%s:%s' % (entry[0], entry[1]) for entry in self.field_info) + + def get_table(self, pipeline): + """Utility to construct an output table reference.""" + project = pipeline.options.view_as(GoogleCloudOptions).project + return '%s:%s.%s' % (project, self.dataset, self.table_name) + + class BuildRowFn(beam.DoFn): + """Convert each key/score pair into a BigQuery TableRow as specified.""" + def __init__(self, field_info): + super(WriteWindowedToBigQuery.BuildRowFn, self).__init__() + self.field_info = field_info + + def process(self, element, window=beam.DoFn.WindowParam): + row = {} + for entry in self.field_info: + row[entry[0]] = entry[2](element, window) + yield row + + def expand(self, pcoll): + table = self.get_table(pcoll.pipeline) + return ( + pcoll + | 'ConvertToRow' >> beam.ParDo( + WriteWindowedToBigQuery.BuildRowFn(self.field_info)) + | beam.io.Write(beam.io.BigQuerySink( + table, + schema=self.get_schema(), + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))) + + +def string_to_timestamp(datetime_str): + dt = datetime.datetime.strptime(datetime_str, '%Y-%m-%d-%H-%M') + epoch = datetime.datetime.utcfromtimestamp(0) + return (dt - epoch).total_seconds() * 1000.0 + + +class HourlyTeamScore(beam.PTransform): + def __init__(self, start_min, stop_min, window_duration): + super(HourlyTeamScore, self).__init__() + self.start_min = start_min + self.stop_min = stop_min + self.window_duration = window_duration + + def expand(self, pcoll): + start_min_filter = string_to_timestamp(self.start_min) + end_min_filter = string_to_timestamp(self.stop_min) + + return ( + pcoll + | 'ParseGameEvent' >> beam.ParDo(ParseEventFn()) + # Filter out data before and after the given times so that it is not + # included in the calculations. As we collect data in batches (say, by + # day), the batch for the day that we want to analyze could potentially + # include some late-arriving data from the previous day. If so, we want + # to weed it out. Similarly, if we include data from the following day + # (to scoop up late-arriving events from the day we're analyzing), we + # need to weed out events that fall after the time period we want to + # analyze. + | 'FilterStartTime' >> beam.Filter( + lambda element: element['timestamp'] > start_min_filter) + | 'FilterEndTime' >> beam.Filter( + lambda element: element['timestamp'] < end_min_filter) + # Add an element timestamp based on the event log, and apply fixed + # windowing. + # Convert element['timestamp'] into seconds as expected by + # TimestampedValue. + | 'AddEventTimestamps' >> beam.Map( + lambda element: TimestampedValue( + element, element['timestamp'] / 1000.0)) + # Convert window_duration into seconds as expected by FixedWindows. + | 'FixedWindowsTeam' >> beam.WindowInto(FixedWindows( + size=self.window_duration * 60)) + # Extract and sum teamname/score pairs from the event data. + | 'ExtractTeamScore' >> ExtractAndSumScore('team')) + + +def run(argv=None): + """Main entry point; defines and runs the hourly_team_score pipeline.""" + parser = argparse.ArgumentParser() + + # The default maps to two large Google Cloud Storage files (each ~12GB) + # holding two subsequent day's worth (roughly) of data. + parser.add_argument('--input', + dest='input', + default='gs://dataflow-samples/game/gaming_data*.csv', + help='Path to the data file(s) containing game data.') + parser.add_argument('--dataset', + dest='dataset', + required=True, + help='BigQuery Dataset to write tables to. ' + 'Must already exist.') + parser.add_argument('--table_name', + dest='table_name', + default='hourly_team_score', + help='The BigQuery table name. Should not already exist.') + parser.add_argument('--window_duration', + type=int, + default=60, + help='Numeric value of fixed window duration, in minutes') + parser.add_argument('--start_min', + dest='start_min', + default='1970-01-01-00-00', + help='String representation of the first minute after ' + 'which to generate results in the format: ' + 'yyyy-MM-dd-HH-mm. Any input data timestamped ' + 'prior to that minute won\'t be included in the ' + 'sums.') + parser.add_argument('--stop_min', + dest='stop_min', + default='2100-01-01-00-00', + help='String representation of the first minute for ' + 'which to generate results in the format: ' + 'yyyy-MM-dd-HH-mm. Any input data timestamped ' + 'after to that minute won\'t be included in the ' + 'sums.') + + known_args, pipeline_args = parser.parse_known_args(argv) + + pipeline_options = PipelineOptions(pipeline_args) + p = beam.Pipeline(options=pipeline_options) + pipeline_options.view_as(SetupOptions).save_main_session = True + + (p # pylint: disable=expression-not-assigned + | ReadFromText(known_args.input) + | HourlyTeamScore( + known_args.start_min, known_args.stop_min, known_args.window_duration) + | WriteWindowedToBigQuery( + known_args.table_name, known_args.dataset, configure_bigquery_write())) + + result = p.run() + result.wait_until_finish() + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run() http://git-wip-us.apache.org/repos/asf/beam/blob/d43391da/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py new file mode 100644 index 0000000..1d93c34 --- /dev/null +++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Test for the user_score example.""" + +import logging +import unittest + +import apache_beam as beam +from apache_beam.test_pipeline import TestPipeline +from apache_beam.examples.complete.game import hourly_team_score + + +class HourlyTeamScoreTest(unittest.TestCase): + + SAMPLE_DATA = [ + 'user1_team1,team1,18,1447686663000,2015-11-16 15:11:03.921', + 'user1_team1,team1,18,1447690263000,2015-11-16 16:11:03.921', + 'user2_team2,team2,2,1447690263000,2015-11-16 16:11:03.955', + 'user3_team3,team3,8,1447690263000,2015-11-16 16:11:03.955', + 'user4_team3,team3,5,1447690263000,2015-11-16 16:11:03.959', + 'user1_team1,team1,14,1447697463000,2015-11-16 18:11:03.955', + ] + + def test_hourly_team_score(self): + with TestPipeline() as p: + result = (p + | beam.Create(HourlyTeamScoreTest.SAMPLE_DATA) + | hourly_team_score.HourlyTeamScore( + start_min='2015-11-16-15-20', + stop_min='2015-11-16-17-20', + window_duration=60)) + beam.assert_that(result, beam.equal_to([ + ('team1', 18), ('team2', 2), ('team3', 13)])) + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/d43391da/sdks/python/apache_beam/examples/complete/game/user_score.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py new file mode 100644 index 0000000..1ebf893 --- /dev/null +++ b/sdks/python/apache_beam/examples/complete/game/user_score.py @@ -0,0 +1,219 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""First in a series of four pipelines that tell a story in a 'gaming' domain. +Concepts: batch processing; reading input from Google Cloud Storage and writing +output to BigQuery; using standalone DoFns; use of the sum by key transform. + +In this gaming scenario, many users play, as members of different teams, over +the course of a day, and their actions are logged for processing. Some of the +logged game events may be late-arriving, if users play on mobile devices and go +transiently offline for a period of time. + +This pipeline does batch processing of data collected from gaming events. It +calculates the sum of scores per user, over an entire batch of gaming data +(collected, say, for each day). The batch processing will not include any late +data that arrives after the day's cutoff point. + +To execute this pipeline using the static example input data, specify the +`--dataset=YOUR-DATASET` flag along with other runner specific flags. (Note: +BigQuery dataset you specify must already exist.) + +Optionally include the `--input` argument to specify a batch input file. See the +`--input` default value for an example batch data file. +""" + +from __future__ import absolute_import + +import argparse +import logging + +import apache_beam as beam +from apache_beam import typehints +from apache_beam.io import ReadFromText +from apache_beam.metrics import Metrics +from apache_beam.typehints import with_input_types +from apache_beam.typehints import with_output_types +from apache_beam.utils.pipeline_options import GoogleCloudOptions +from apache_beam.utils.pipeline_options import PipelineOptions + + +class ParseEventFn(beam.DoFn): + """Parses the raw game event info into GameActionInfo tuples. + + Each event line has the following format: + username,teamname,score,timestamp_in_ms,readable_time + + e.g.: + user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224 + + The human-readable time string is not used here. + """ + def __init__(self): + super(ParseEventFn, self).__init__() + self.num_parse_errors = Metrics.counter(self.__class__, 'num_parse_errors') + + def process(self, element): + components = element.split(',') + try: + user = components[0].strip() + team = components[1].strip() + score = int(components[2].strip()) + timestamp = int(components[3].strip()) + yield {'user': user, 'team': team, 'score': score, 'timestamp': timestamp} + except: # pylint: disable=bare-except + # Log and count parse errors. + self.num_parse_errors.inc() + logging.info('Parse error on %s.', element) + + +@with_input_types(ints=typehints.Iterable[int]) +@with_output_types(int) +def sum_ints(ints): + return sum(ints) + + +class ExtractAndSumScore(beam.PTransform): + """A transform to extract key/score information and sum the scores. + + The constructor argument `field` determines whether 'team' or 'user' info is + extracted. + """ + def __init__(self, field): + super(ExtractAndSumScore, self).__init__() + self.field = field + + def expand(self, pcoll): + return (pcoll + | beam.Map(lambda info: (info[self.field], info['score'])) + | beam.CombinePerKey(sum_ints)) + + +def configure_bigquery_write(): + return [ + ('user', 'STRING', lambda e: e[0]), + ('total_score', 'INTEGER', lambda e: e[1]), + ] + + +class WriteToBigQuery(beam.PTransform): + """Generate, format, and write BigQuery table row information. + + Use provided information about the field names and types, as well as lambda + functions that describe how to generate their values. + """ + + def __init__(self, table_name, dataset, field_info): + """Initializes the transform. + + Args: + table_name: Name of the BigQuery table to use. + dataset: Name of the dataset to use. + field_info: List of tuples that holds information about output table field + definitions. The tuples are in the + (field_name, field_type, field_fn) format, where field_name is + the name of the field, field_type is the BigQuery type of the + field and field_fn is a lambda function to generate the field + value from the element. + """ + super(WriteToBigQuery, self).__init__() + self.table_name = table_name + self.dataset = dataset + self.field_info = field_info + + def get_schema(self): + """Build the output table schema.""" + return ', '.join( + '%s:%s' % (entry[0], entry[1]) for entry in self.field_info) + + def get_table(self, pipeline): + """Utility to construct an output table reference.""" + project = pipeline.options.view_as(GoogleCloudOptions).project + return '%s:%s.%s' % (project, self.dataset, self.table_name) + + class BuildRowFn(beam.DoFn): + """Convert each key/score pair into a BigQuery TableRow as specified.""" + def __init__(self, field_info): + super(WriteToBigQuery.BuildRowFn, self).__init__() + self.field_info = field_info + + def process(self, element): + row = {} + for entry in self.field_info: + row[entry[0]] = entry[2](element) + yield row + + def expand(self, pcoll): + table = self.get_table(pcoll.pipeline) + return ( + pcoll + | 'ConvertToRow' >> beam.ParDo( + WriteToBigQuery.BuildRowFn(self.field_info)) + | beam.io.Write(beam.io.BigQuerySink( + table, + schema=self.get_schema(), + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))) + + +class UserScore(beam.PTransform): + def __init__(self): + super(UserScore, self).__init__() + + def expand(self, pcoll): + return (pcoll + | 'ParseGameEvent' >> beam.ParDo(ParseEventFn()) + # Extract and sum username/score pairs from the event data. + | 'ExtractUserScore' >> ExtractAndSumScore('user')) + + +def run(argv=None): + """Main entry point; defines and runs the user_score pipeline.""" + parser = argparse.ArgumentParser() + + # The default maps to two large Google Cloud Storage files (each ~12GB) + # holding two subsequent day's worth (roughly) of data. + parser.add_argument('--input', + dest='input', + default='gs://dataflow-samples/game/gaming_data*.csv', + help='Path to the data file(s) containing game data.') + parser.add_argument('--dataset', + dest='dataset', + required=True, + help='BigQuery Dataset to write tables to. ' + 'Must already exist.') + parser.add_argument('--table_name', + dest='table_name', + default='user_score', + help='The BigQuery table name. Should not already exist.') + known_args, pipeline_args = parser.parse_known_args(argv) + + pipeline_options = PipelineOptions(pipeline_args) + p = beam.Pipeline(options=pipeline_options) + + (p # pylint: disable=expression-not-assigned + | ReadFromText(known_args.input) # Read events from a file and parse them. + | UserScore() + | WriteToBigQuery( + known_args.table_name, known_args.dataset, configure_bigquery_write())) + + result = p.run() + result.wait_until_finish() + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run() http://git-wip-us.apache.org/repos/asf/beam/blob/d43391da/sdks/python/apache_beam/examples/complete/game/user_score_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/game/user_score_test.py b/sdks/python/apache_beam/examples/complete/game/user_score_test.py new file mode 100644 index 0000000..6ed1462 --- /dev/null +++ b/sdks/python/apache_beam/examples/complete/game/user_score_test.py @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Test for the user_score example.""" + +import logging +import unittest + +import apache_beam as beam +from apache_beam.test_pipeline import TestPipeline +from apache_beam.examples.complete.game import user_score + + +class UserScoreTest(unittest.TestCase): + + SAMPLE_DATA = [ + 'user1_team1,team1,18,1447686663000,2015-11-16 15:11:03.921', + 'user1_team1,team1,18,1447690263000,2015-11-16 16:11:03.921', + 'user2_team2,team2,2,1447690263000,2015-11-16 16:11:03.955', + 'user3_team3,team3,8,1447690263000,2015-11-16 16:11:03.955', + 'user4_team3,team3,5,1447690263000,2015-11-16 16:11:03.959', + 'user1_team1,team1,14,1447697463000,2015-11-16 18:11:03.955', + ] + + def test_user_score(self): + with TestPipeline() as p: + result = ( + p | beam.Create(UserScoreTest.SAMPLE_DATA) | user_score.UserScore()) + beam.assert_that(result, beam.equal_to([ + ('user1_team1', 50), ('user2_team2', 2), ('user3_team3', 8), + ('user4_team3', 5)])) + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main()