beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-3824) BQ sink fails on Direct Runner for 2.4.0 RC2
Date Sat, 10 Mar 2018 03:18:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-3824?focusedWorklogId=79161&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79161
]

ASF GitHub Bot logged work on BEAM-3824:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Mar/18 03:17
            Start Date: 10/Mar/18 03:17
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #4846: [BEAM-3824] Revert #4666 "Use beam.io.WriteToBigQuery()"
URL: https://github.com/apache/beam/pull/4846
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats.py b/sdks/python/apache_beam/examples/complete/game/game_stats.py
index 32f6f15e3a8..1f13ed180f6 100644
--- a/sdks/python/apache_beam/examples/complete/game/game_stats.py
+++ b/sdks/python/apache_beam/examples/complete/game/game_stats.py
@@ -163,6 +163,43 @@ def process(self, team_score, window=beam.DoFn.WindowParam):
     }
 
 
+class WriteToBigQuery(beam.PTransform):
+  """Generate, format, and write BigQuery table row information."""
+  def __init__(self, table_name, dataset, schema):
+    """Initializes the transform.
+    Args:
+      table_name: Name of the BigQuery table to use.
+      dataset: Name of the dataset to use.
+      schema: Dictionary in the format {'column_name': 'bigquery_type'}
+    """
+    super(WriteToBigQuery, self).__init__()
+    self.table_name = table_name
+    self.dataset = dataset
+    self.schema = schema
+
+  def get_schema(self):
+    """Build the output table schema."""
+    return ', '.join(
+        '%s:%s' % (col, self.schema[col]) for col in self.schema)
+
+  def get_table(self, pipeline):
+    """Utility to construct an output table reference."""
+    project = pipeline.options.view_as(GoogleCloudOptions).project
+    return '%s:%s.%s' % (project, self.dataset, self.table_name)
+
+  def expand(self, pcoll):
+    table = self.get_table(pcoll.pipeline)
+    return (
+        pcoll
+        | 'ConvertToRow' >> beam.Map(
+            lambda elem: {col: elem[col] for col in self.schema})
+        | beam.io.Write(beam.io.BigQuerySink(
+            table,
+            schema=self.get_schema(),
+            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
+
+
 # [START abuse_detect]
 class CalculateSpammyUsers(beam.PTransform):
   """Filter out all but those users with a high clickrate, which we will
@@ -243,8 +280,7 @@ def run(argv=None):
   options = PipelineOptions(pipeline_args)
 
   # We also require the --project option to access --dataset
-  project = options.view_as(GoogleCloudOptions).project
-  if project is None:
+  if options.view_as(GoogleCloudOptions).project is None:
     parser.print_usage()
     print(sys.argv[0] + ': error: argument --project is required')
     sys.exit(1)
@@ -260,8 +296,6 @@ def run(argv=None):
   # Enforce that this pipeline is always run in streaming mode
   options.view_as(StandardOptions).streaming = True
 
-  table_spec_prefix = '{}:{}.{}'.format(project, args.dataset, args.table_name)
-
   with beam.Pipeline(options=options) as p:
     # Read events from Pub/Sub using custom timestamps
     raw_events = (
@@ -298,13 +332,6 @@ def run(argv=None):
     # updates for late data. Uses the side input derived above --the set of
     # suspected robots-- to filter out scores from those users from the sum.
     # Write the results to BigQuery.
-    team_table_spec = table_spec_prefix + '_teams'
-    team_table_schema = (
-        'team:STRING, '
-        'total_score:INTEGER, '
-        'window_start:STRING, '
-        'processing_time: STRING')
-
     (raw_events  # pylint: disable=expression-not-assigned
      | 'WindowIntoFixedWindows' >> beam.WindowInto(
          beam.window.FixedWindows(fixed_window_duration))
@@ -317,20 +344,19 @@ def run(argv=None):
      | 'ExtractAndSumScore' >> ExtractAndSumScore('team')
      # [END filter_and_calc]
      | 'TeamScoresDict' >> beam.ParDo(TeamScoresDict())
-     | 'WriteTeamScoreSums' >> beam.io.WriteToBigQuery(
-         team_table_spec,
-         schema=team_table_schema,
-         create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-         write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
+     | 'WriteTeamScoreSums' >> WriteToBigQuery(
+         args.table_name + '_teams', args.dataset, {
+             'team': 'STRING',
+             'total_score': 'INTEGER',
+             'window_start': 'STRING',
+             'processing_time': 'STRING',
+         }))
 
     # [START session_calc]
     # Detect user sessions-- that is, a burst of activity separated by a gap
     # from further activity. Find and record the mean session lengths.
     # This information could help the game designers track the changing user
     # engagement as their set of game changes.
-    session_table_spec = table_spec_prefix + '_sessions'
-    session_table_schema = 'mean_duration:FLOAT'
-
     (user_events  # pylint: disable=expression-not-assigned
      | 'WindowIntoSessions' >> beam.WindowInto(
          beam.window.Sessions(session_gap),
@@ -355,11 +381,10 @@ def run(argv=None):
      | beam.CombineGlobally(beam.combiners.MeanCombineFn()).without_defaults()
      | 'FormatAvgSessionLength' >> beam.Map(
          lambda elem: {'mean_duration': float(elem)})
-     | 'WriteAvgSessionLength' >> beam.io.WriteToBigQuery(
-         session_table_spec,
-         schema=session_table_schema,
-         create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-         write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
+     | 'WriteAvgSessionLength' >> WriteToBigQuery(
+         args.table_name + '_sessions', args.dataset, {
+             'mean_duration': 'FLOAT',
+         }))
      # [END rewindow]
 
 
diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
index 6e826d45fee..b286a6a5ddf 100644
--- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
@@ -156,6 +156,43 @@ def process(self, team_score, window=beam.DoFn.WindowParam):
     }
 
 
+class WriteToBigQuery(beam.PTransform):
+  """Generate, format, and write BigQuery table row information."""
+  def __init__(self, table_name, dataset, schema):
+    """Initializes the transform.
+    Args:
+      table_name: Name of the BigQuery table to use.
+      dataset: Name of the dataset to use.
+      schema: Dictionary in the format {'column_name': 'bigquery_type'}
+    """
+    super(WriteToBigQuery, self).__init__()
+    self.table_name = table_name
+    self.dataset = dataset
+    self.schema = schema
+
+  def get_schema(self):
+    """Build the output table schema."""
+    return ', '.join(
+        '%s:%s' % (col, self.schema[col]) for col in self.schema)
+
+  def get_table(self, pipeline):
+    """Utility to construct an output table reference."""
+    project = pipeline.options.view_as(GoogleCloudOptions).project
+    return '%s:%s.%s' % (project, self.dataset, self.table_name)
+
+  def expand(self, pcoll):
+    table = self.get_table(pcoll.pipeline)
+    return (
+        pcoll
+        | 'ConvertToRow' >> beam.Map(
+            lambda elem: {col: elem[col] for col in self.schema})
+        | beam.io.Write(beam.io.BigQuerySink(
+            table,
+            schema=self.get_schema(),
+            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
+
+
 # [START main]
 class HourlyTeamScore(beam.PTransform):
   def __init__(self, start_min, stop_min, window_duration):
@@ -241,8 +278,7 @@ def run(argv=None):
   options = PipelineOptions(pipeline_args)
 
   # We also require the --project option to access --dataset
-  project = options.view_as(GoogleCloudOptions).project
-  if project is None:
+  if options.view_as(GoogleCloudOptions).project is None:
     parser.print_usage()
     print(sys.argv[0] + ': error: argument --project is required')
     sys.exit(1)
@@ -251,23 +287,18 @@ def run(argv=None):
   # workflow rely on global context (e.g., a module imported at module level).
   options.view_as(SetupOptions).save_main_session = True
 
-  table_spec = '{}:{}.{}'.format(project, args.dataset, args.table_name)
-  table_schema = (
-      'team:STRING, '
-      'total_score:INTEGER, '
-      'window_start:STRING')
-
   with beam.Pipeline(options=options) as p:
     (p  # pylint: disable=expression-not-assigned
      | 'ReadInputText' >> beam.io.ReadFromText(args.input)
      | 'HourlyTeamScore' >> HourlyTeamScore(
          args.start_min, args.stop_min, args.window_duration)
      | 'TeamScoresDict' >> beam.ParDo(TeamScoresDict())
-     | 'WriteTeamScoreSums' >> beam.io.WriteToBigQuery(
-         table_spec,
-         schema=table_schema,
-         create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-         write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
+     | 'WriteTeamScoreSums' >> WriteToBigQuery(
+         args.table_name, args.dataset, {
+             'team': 'STRING',
+             'total_score': 'INTEGER',
+             'window_start': 'STRING',
+         }))
 # [END main]
 
 
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py b/sdks/python/apache_beam/examples/complete/game/leader_board.py
index 0d1fce47663..e207f26712e 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -171,6 +171,43 @@ def process(self, team_score, window=beam.DoFn.WindowParam):
     }
 
 
+class WriteToBigQuery(beam.PTransform):
+  """Generate, format, and write BigQuery table row information."""
+  def __init__(self, table_name, dataset, schema):
+    """Initializes the transform.
+    Args:
+      table_name: Name of the BigQuery table to use.
+      dataset: Name of the dataset to use.
+      schema: Dictionary in the format {'column_name': 'bigquery_type'}
+    """
+    super(WriteToBigQuery, self).__init__()
+    self.table_name = table_name
+    self.dataset = dataset
+    self.schema = schema
+
+  def get_schema(self):
+    """Build the output table schema."""
+    return ', '.join(
+        '%s:%s' % (col, self.schema[col]) for col in self.schema)
+
+  def get_table(self, pipeline):
+    """Utility to construct an output table reference."""
+    project = pipeline.options.view_as(GoogleCloudOptions).project
+    return '%s:%s.%s' % (project, self.dataset, self.table_name)
+
+  def expand(self, pcoll):
+    table = self.get_table(pcoll.pipeline)
+    return (
+        pcoll
+        | 'ConvertToRow' >> beam.Map(
+            lambda elem: {col: elem[col] for col in self.schema})
+        | beam.io.Write(beam.io.BigQuerySink(
+            table,
+            schema=self.get_schema(),
+            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
+
+
 # [START window_and_trigger]
 class CalculateTeamScores(beam.PTransform):
   """Calculates scores for each team within the configured window duration.
@@ -257,8 +294,7 @@ def run(argv=None):
   options = PipelineOptions(pipeline_args)
 
   # We also require the --project option to access --dataset
-  project = options.view_as(GoogleCloudOptions).project
-  if project is None:
+  if options.view_as(GoogleCloudOptions).project is None:
     parser.print_usage()
     print(sys.argv[0] + ': error: argument --project is required')
     sys.exit(1)
@@ -270,8 +306,6 @@ def run(argv=None):
   # Enforce that this pipeline is always run in streaming mode
   options.view_as(StandardOptions).streaming = True
 
-  table_spec_prefix = '{}:{}.{}'.format(project, args.dataset, args.table_name)
-
   with beam.Pipeline(options=options) as p:
     # Read game events from Pub/Sub using custom timestamps, which are extracted
     # from the pubsub data elements, and parse the data.
@@ -282,37 +316,32 @@ def run(argv=None):
         | 'AddEventTimestamps' >> beam.Map(
             lambda elem: beam.window.TimestampedValue(elem, elem['timestamp'])))
 
-    team_table_spec = table_spec_prefix + '_teams'
-    team_table_schema = (
-        'team:STRING, '
-        'total_score:INTEGER, '
-        'window_start:STRING, '
-        'processing_time: STRING')
-
     # Get team scores and write the results to BigQuery
     (events  # pylint: disable=expression-not-assigned
      | 'CalculateTeamScores' >> CalculateTeamScores(
          args.team_window_duration, args.allowed_lateness)
      | 'TeamScoresDict' >> beam.ParDo(TeamScoresDict())
-     | 'WriteTeamScoreSums' >> beam.io.WriteToBigQuery(
-         team_table_spec,
-         schema=team_table_schema,
-         create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-         write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
-
-    user_table_spec = table_spec_prefix + '_users'
-    user_table_schema = 'user:STRING, total_score:INTEGER'
+     | 'WriteTeamScoreSums' >> WriteToBigQuery(
+         args.table_name + '_teams', args.dataset, {
+             'team': 'STRING',
+             'total_score': 'INTEGER',
+             'window_start': 'STRING',
+             'processing_time': 'STRING',
+         }))
+
+    def format_user_score_sums(user_score):
+      (user, score) = user_score
+      return {'user': user, 'total_score': score}
 
     # Get user scores and write the results to BigQuery
     (events  # pylint: disable=expression-not-assigned
      | 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness)
-     | 'FormatUserScoreSums' >> beam.Map(
-         lambda elem: {'user': elem[0], 'total_score': elem[1]})
-     | 'WriteUserScoreSums' >> beam.io.WriteToBigQuery(
-         user_table_spec,
-         schema=user_table_schema,
-         create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-         write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
+     | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums)
+     | 'WriteUserScoreSums' >> WriteToBigQuery(
+         args.table_name + '_users', args.dataset, {
+             'user': 'STRING',
+             'total_score': 'INTEGER',
+         }))
 
 
 if __name__ == '__main__':


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 79161)
            Time Spent: 10m
    Remaining Estimate: 0h

> BQ sink fails on Direct Runner for 2.4.0 RC2
> --------------------------------------------
>
>                 Key: BEAM-3824
>                 URL: https://issues.apache.org/jira/browse/BEAM-3824
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>    Affects Versions: 2.4.0
>            Reporter: Valentyn Tymofieiev
>            Assignee: David Cavazos
>            Priority: Blocker
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> python -m apache_beam.examples.complete.game.hourly_team_score --project=$PROJECT --dataset=beam_release_2_4_0
--input=gs://$BUCKET/mobile/first_5000_gaming_data.csv
> The pipeline fails with:
> INFO:root:finish <DoOperation WriteTeamScoreSums/WriteToBigQuery output_tags=['out'],
receivers=[ConsumerSet[WriteTeamScoreSums/WriteToBigQuery.out0, coder=WindowedValueCoder[FastPrimitivesCoder],
len(consumers)=0]]> 
> Traceback (most recent call last):
>  File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
>  "__main__", fname, loader, pkg_name) 
>  File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
>  exec code in run_globals 
>  File "/tmp/release_testing/r2.4.0_env/lib/python2.7/site-packages/apache_beam/examples/complete/game/hourly_team_score.py",
line 276, in <
> module> 
>  run() 
>  File "/tmp/release_testing/r2.4.0_env/lib/python2.7/site-packages/apache_beam/examples/complete/game/hourly_team_score.py",
line 270, in r
> un 
>  write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) 
>  File "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 389, in __exit__
>  self.run().wait_until_finish() 
>  File "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 369, in run
>  self.to_runner_api(), self.runner, self._options).run(False) 
>  File "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 382, in run
>  return self.runner.run_pipeline(self) 
>  File "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
line 129, in run_pip
> eline 
>  return runner.run_pipeline(pipeline)
>  File "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
line 215, in ru
> n_pipeline 
>  return self.run_via_runner_api(pipeline.to_runner_api())
>  File "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
line 218, in ru
> n_via_runner_api 
>  return self.run_stages(*self.create_stages(pipeline_proto))
>  File "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
line 837, in ru
> n_stages 
>  pcoll_buffers, safe_coders).process_bundle.metrics
>  File "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
line 938, in ru
> n_stage 
>  self._progress_frequency).process_bundle(data_input, data_output)
>  File "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
line 1110, in p
> rocess_bundle 
>  result_future = self._controller.control_handler.push(process_bundle)
>  File "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
line 1003, in p
> ush 
>  response = self.worker.do_instruction(request)
>  File "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 185, in do_instruc
> tion 
>  request.instruction_id) 
>  File "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 202, in process_bu
> ndle 
>  processor.process_bundle(instruction_id)
>  File "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 298, in proc
> ess_bundle 
>  op.finish() 
>  File "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/worker/operations.py",
line 389, in finish
>  self.dofn_runner.finish()
>  File "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/common.py",
line 517, in finish
>  self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
>  File "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/common.py",
line 508, in _invoke_bundle_method
>  self._reraise_augmented(exn) 
>  File "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/common.py",
line 537, in _reraise_augmented
>  six.raise_from(new_exn, original_traceback)
>  File "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/six.py", line
737, in raise_from
>  raise value 
> RuntimeError: Could not successfully insert rows to BigQuery table [google.com:clouddfe:beam_release_2_4_0.leader_board].
Errors: [<InsertEr
> rorsValueListEntry
>  errors: [<ErrorProto 
>  debugInfo: u'' 
>  location: u'processing_time' 
>  message: u'no such field.'
>  reason: u'invalid'>]
>  index: 0>, <InsertErrorsValueListEntry
>  errors: [<ErrorProto
>  debugInfo: u''
>  location: u'processing_time'
>  message: u'no such field.'
>  reason: u'invalid'>]
>  index: 1>, <InsertErrorsValueListEntry
>  errors: [<ErrorProto
>  debugInfo: u''
>  location: u'processing_time'
>  message: u'no such field.'
>  reason: u'invalid'>]
>  index: 2>, <InsertErrorsValueListEntry
>  errors: [<ErrorProto
>  debugInfo: u''
>  location: u'processing_time'
>  message: u'no such field.'
>  reason: u'invalid'>]
>  index: 3>, <InsertErrorsValueListEntry
>  errors: [<ErrorProto
>  debugInfo: u''
>  location: u'processing_time'
>  message: u'no such field.'
>  reason: u'invalid'>]
> ...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message