beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-2810) Consider a faster Avro library in Python
Date Thu, 28 Jun 2018 01:03:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-2810?focusedWorklogId=116710&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116710
]

ASF GitHub Bot logged work on BEAM-2810:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Jun/18 01:02
            Start Date: 28/Jun/18 01:02
    Worklog Time Spent: 10m 
      Work Description: ryan-williams commented on a change in pull request #5496: [BEAM-2810]
use fastavro in Avro IO
URL: https://github.com/apache/beam/pull/5496#discussion_r198683235
 
 

 ##########
 File path: sdks/python/apache_beam/io/avroio.py
 ##########
 @@ -377,6 +407,56 @@ def split_points_unclaimed(stop_position):
           yield record
 
 
+class _FastAvroSource(filebasedsource.FileBasedSource):
+  """A source for reading Avro files using the `fastavro` library.
+
+  ``_FastAvroSource`` is implemented using the file-based source framework
+  available in module 'filebasedsource'. Hence please refer to module
+  'filebasedsource' to fully understand how this source implements operations
+  common to all file-based sources such as file-pattern expansion and splitting
+  into bundles for parallel processing.
+
+  TODO: remove ``_AvroSource`` in favor of using ``_FastAvroSource``
+  everywhere once it has been more widely tested
+  """
+
+  def read_records(self, file_name, range_tracker):
+    next_block_start = -1
+
+    def split_points_unclaimed(stop_position):
+      if next_block_start >= stop_position:
+        # Next block starts at or after the suggested stop position. Hence
+        # there will not be split points to be claimed for the range ending at
+        # suggested stop position.
+        return 0
+
+      return iobase.RangeTracker.SPLIT_POINTS_UNKNOWN
+
+    range_tracker.set_split_points_unclaimed_callback(split_points_unclaimed)
+
+    start_offset = range_tracker.start_position()
+    if start_offset is None:
+      start_offset = 0
+
+    with self.open_file(file_name) as f:
+      blocks = block_reader(f)
+      sync_marker = blocks._header['sync']
+
+      # We have to start at current position if previous bundle ended at the
+      # end of a sync marker.
+      start_offset = max(0, start_offset - len(sync_marker))
+      f.seek(start_offset)
+      _AvroUtils.advance_file_past_next_sync_marker(f, sync_marker)
+
+      next_block_start = f.tell()
+
+      while range_tracker.try_claim(next_block_start):
+        block = next(blocks)
+        next_block_start = f.tell()
 
 Review comment:
   good point! 
   
   I've now changed `next_block_start` to update to `block.offset + block.size`, which `_AvroSource`
was already doing, and also made `_AvroSource` mirror `_FastAvroSource` in passing `next_block_start`
to `range_tracker.try_claim`
   
   this should alleviate concerns about read-ahead interfering with these calculations

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 116710)
    Time Spent: 4h 20m  (was: 4h 10m)

> Consider a faster Avro library in Python
> ----------------------------------------
>
>                 Key: BEAM-2810
>                 URL: https://issues.apache.org/jira/browse/BEAM-2810
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Eugene Kirpichov
>            Assignee: Ryan Williams
>            Priority: Major
>          Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> https://stackoverflow.com/questions/45870789/bottleneck-on-data-source
> Seems like this job is reading Avro files (exported by BigQuery) at about 2 MB/s.
> We use the standard Python "avro" library which is apparently known to be very slow (10x+
slower than Java) http://apache-avro.679487.n3.nabble.com/Avro-decode-very-slow-in-Python-td4034422.html,
and there are alternatives e.g. https://pypi.python.org/pypi/fastavro/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message