beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-3883) Python SDK stages artifacts when talking to job server
Date Wed, 16 May 2018 23:47:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-3883?focusedWorklogId=102719&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-102719
]

ASF GitHub Bot logged work on BEAM-3883:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/May/18 23:46
            Start Date: 16/May/18 23:46
    Worklog Time Spent: 10m 
      Work Description: tvalentyn commented on a change in pull request #5251: [BEAM-3883]
Refactor and clean dependency.py to make it reusable with artifact service
URL: https://github.com/apache/beam/pull/5251#discussion_r188799057
 
 

 ##########
 File path: sdks/python/apache_beam/runners/portability/stager_test.py
 ##########
 @@ -420,134 +375,167 @@ def test_sdk_location_local_directory_not_present(self):
     sdk_location = 'nosuchdir'
     with self.assertRaises(RuntimeError) as cm:
       options = PipelineOptions()
-      options.view_as(GoogleCloudOptions).staging_location = staging_dir
       self.update_options(options)
       options.view_as(SetupOptions).sdk_location = sdk_location
 
-      dependency.stage_job_resources(options)
+      self.stager.stage_job_resources(options, staging_location=staging_dir)
     self.assertEqual(
         'The file "%s" cannot be found. Its '
         'location was specified by the --sdk_location command-line option.' %
-        sdk_location,
-        cm.exception.args[0])
+        sdk_location, cm.exception.args[0])
 
-  def test_sdk_location_gcs_source_file(self):
+  def test_sdk_location_remote_source_file(self):
     staging_dir = self.make_temp_dir()
     sdk_location = 'gs://my-gcs-bucket/tarball.tar.gz'
 
     options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     self.update_options(options)
     options.view_as(SetupOptions).sdk_location = sdk_location
 
-    with mock.patch('apache_beam.runners.dataflow.internal.'
-                    'dependency._dependency_file_copy'):
-      self.assertEqual(
-          [names.DATAFLOW_SDK_TARBALL_FILE],
-          dependency.stage_job_resources(options))
-
-  def test_sdk_location_gcs_wheel_file(self):
+    with mock.patch('.'.join([
+        self.__module__, TestStager.__name__, TestStager.stage_artifact.__name__
+    ])):
+      with mock.patch('.'.join([
+          self.__module__, TestStager.__name__,
+          TestStager._download_file.__name__
+      ])):
+        self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE],
+                         self.stager.stage_job_resources(
+                             options, staging_location=staging_dir))
+
+  def test_sdk_location_remote_wheel_file(self):
     staging_dir = self.make_temp_dir()
     sdk_filename = 'apache_beam-1.0.0-cp27-cp27mu-manylinux1_x86_64.whl'
-    sdk_location = 'gs://my-gcs-bucket/' + sdk_filename
+    sdk_location = '/tmp/remote/my-bucket/' + sdk_filename
 
     options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     self.update_options(options)
     options.view_as(SetupOptions).sdk_location = sdk_location
 
-    with mock.patch('apache_beam.runners.dataflow.internal.'
-                    'dependency._dependency_file_copy'):
-      self.assertEqual(
-          [sdk_filename],
-          dependency.stage_job_resources(options))
+    # We can not rely on actual remote file systems paths hence making
+    # '/tmp/remote/' a new remote path.
+    def is_remote_path(dummy_self, path):
+      return path.startswith('/tmp/remote/')
+
+    with mock.patch('.'.join([
+        self.__module__, TestStager.__name__, TestStager.stage_artifact.__name__
+    ])):
+      with mock.patch('.'.join([
+          self.__module__, TestStager.__name__,
+          TestStager._download_file.__name__
+      ])):
+        with mock.patch(
+            '.'.join([
+                self.__module__, TestStager.__name__,
+                TestStager._is_remote_path.__name__
+            ]), is_remote_path):
+          self.assertEqual([sdk_filename],
+                           self.stager.stage_job_resources(
+                               options, staging_location=staging_dir))
 
   def test_sdk_location_http(self):
     staging_dir = self.make_temp_dir()
     sdk_location = 'http://storage.googleapis.com/my-gcs-bucket/tarball.tar.gz'
 
     options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     self.update_options(options)
     options.view_as(SetupOptions).sdk_location = sdk_location
 
-    def file_download(_, to_folder):
-      tarball_path = os.path.join(to_folder, 'sdk-tarball')
-      with open(tarball_path, 'w') as f:
+    def file_download(dummy_self, _, to_path):
+      with open(to_path, 'w') as f:
         f.write('Package content.')
-      return tarball_path
+      return to_path
 
-    with mock.patch('apache_beam.runners.dataflow.internal.'
-                    'dependency._dependency_file_download', file_download):
-      self.assertEqual(
-          [names.DATAFLOW_SDK_TARBALL_FILE],
-          dependency.stage_job_resources(options))
-
-    tarball_path = os.path.join(
-        staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
+    with mock.patch(
+        '.'.join([
+            self.__module__, TestStager.__name__,
+            TestStager._download_file.__name__
+        ]), file_download):
+      self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE],
+                       self.stager.stage_job_resources(
+                           options, staging_location=staging_dir))
+
+    tarball_path = os.path.join(staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
     with open(tarball_path) as f:
       self.assertEqual(f.read(), 'Package content.')
 
   def test_with_extra_packages(self):
     staging_dir = self.make_temp_dir()
     source_dir = self.make_temp_dir()
+    self.create_temp_file(os.path.join(source_dir, 'abc.tar.gz'), 'nothing')
+    self.create_temp_file(os.path.join(source_dir, 'xyz.tar.gz'), 'nothing')
+    self.create_temp_file(os.path.join(source_dir, 'xyz2.tar'), 'nothing')
+    self.create_temp_file(os.path.join(source_dir, 'whl.whl'), 'nothing')
     self.create_temp_file(
-        os.path.join(source_dir, 'abc.tar.gz'), 'nothing')
-    self.create_temp_file(
-        os.path.join(source_dir, 'xyz.tar.gz'), 'nothing')
-    self.create_temp_file(
-        os.path.join(source_dir, 'xyz2.tar'), 'nothing')
-    self.create_temp_file(
-        os.path.join(source_dir, 'whl.whl'), 'nothing')
-    self.create_temp_file(
-        os.path.join(source_dir, dependency.EXTRA_PACKAGES_FILE), 'nothing')
+        os.path.join(source_dir, stager.EXTRA_PACKAGES_FILE), 'nothing')
 
     options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     self.update_options(options)
     options.view_as(SetupOptions).extra_packages = [
         os.path.join(source_dir, 'abc.tar.gz'),
         os.path.join(source_dir, 'xyz.tar.gz'),
         os.path.join(source_dir, 'xyz2.tar'),
-        os.path.join(source_dir, 'whl.whl'),
-        'gs://my-gcs-bucket/gcs.tar.gz']
+        os.path.join(source_dir, 'whl.whl'), '/tmp/remote/remote_file.tar.gz'
+    ]
 
-    gcs_copied_files = []
+    remote_copied_files = []
 
-    def file_copy(from_path, to_path):
-      if from_path.startswith('gs://'):
-        gcs_copied_files.append(from_path)
+    # We can not rely on actual remote file systems paths hence making
+    # '/tmp/remote/' a new remote path.
+    def is_remote_path(dummy_self, path):
+      return path.startswith('/tmp/remote/')
+
+    def file_copy(dummy_self, from_path, to_path):
+      if is_remote_path(dummy_self, from_path):
+        remote_copied_files.append(from_path)
         _, from_name = os.path.split(from_path)
         if os.path.isdir(to_path):
           to_path = os.path.join(to_path, from_name)
         self.create_temp_file(to_path, 'nothing')
-        logging.info('Fake copied GCS file: %s to %s', from_path, to_path)
-      elif to_path.startswith('gs://'):
-        logging.info('Faking file_copy(%s, %s)', from_path, to_path)
+        logging.info('Fake copied remote file: %s to %s', from_path, to_path)
+      elif is_remote_path(dummy_self, to_path):
+        logging.info('Faking upload_file(%s, %s)', from_path, to_path)
       else:
         shutil.copyfile(from_path, to_path)
 
-    dependency._dependency_file_copy = file_copy
-
-    self.assertEqual(
-        ['abc.tar.gz', 'xyz.tar.gz', 'xyz2.tar', 'whl.whl', 'gcs.tar.gz',
-         dependency.EXTRA_PACKAGES_FILE],
-        dependency.stage_job_resources(options))
-    with open(os.path.join(staging_dir, dependency.EXTRA_PACKAGES_FILE)) as f:
-      self.assertEqual(['abc.tar.gz\n', 'xyz.tar.gz\n', 'xyz2.tar\n',
-                        'whl.whl\n', 'gcs.tar.gz\n'], f.readlines())
-    self.assertEqual(['gs://my-gcs-bucket/gcs.tar.gz'], gcs_copied_files)
+    with mock.patch(
+        '.'.join([
+            self.__module__, TestStager.__name__,
+            TestStager.stage_artifact.__name__
+        ]), file_copy):
 
 Review comment:
   Can we use a fake implementation of stage_artifact that we implemented in TestStager? Why
do we have to mock it here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 102719)

> Python SDK stages artifacts when talking to job server
> ------------------------------------------------------
>
>                 Key: BEAM-3883
>                 URL: https://issues.apache.org/jira/browse/BEAM-3883
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Ben Sidhom
>            Assignee: Ankur Goenka
>            Priority: Major
>          Time Spent: 12h 40m
>  Remaining Estimate: 0h
>
> The Python SDK does not currently stage its user-defined functions or dependencies when
talking to the job API. Artifacts that need to be staged include the user code itself, any
SDK components not included in the container image, and the list of Python packages that must
be installed at runtime.
>  
> Artifacts that are currently expected can be found in the harness boot code: [https://github.com/apache/beam/blob/58e3b06bee7378d2d8db1c8dd534b415864f63e1/sdks/python/container/boot.go#L52.]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message