beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: [BEAM-1715] Fix Python WordCount on Dataflow Mismatch
Date Thu, 23 Mar 2017 00:34:38 GMT
Repository: beam
Updated Branches:
  refs/heads/master 65135fd7a -> 70efdd0fe


[BEAM-1715] Fix Python WordCount on Dataflow Mismatch


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fa6f5f0f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fa6f5f0f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fa6f5f0f

Branch: refs/heads/master
Commit: fa6f5f0f45a3cb8343d0a30dac8f75a8097d65d1
Parents: 65135fd
Author: Mark Liu <markliu@google.com>
Authored: Tue Mar 14 12:43:45 2017 -0700
Committer: Ahmet Altay <altay@google.com>
Committed: Wed Mar 22 17:34:18 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/wordcount_it_test.py   |  5 +++-
 .../apache_beam/tests/pipeline_verifiers.py     | 31 +++++++++++++++++++-
 .../tests/pipeline_verifiers_test.py            | 21 +++++++++++++
 sdks/python/run_postcommit.sh                   |  3 +-
 4 files changed, 57 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fa6f5f0f/sdks/python/apache_beam/examples/wordcount_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index 77926bb..1c700b6 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -43,9 +43,12 @@ class WordCountIT(unittest.TestCase):
     output = '/'.join([test_pipeline.get_option('output'),
                        test_pipeline.get_option('job_name'),
                        'results'])
+    arg_sleep_secs = test_pipeline.get_option('sleep_secs')
+    sleep_secs = int(arg_sleep_secs) if arg_sleep_secs is not None else None
     pipeline_verifiers = [PipelineStateMatcher(),
                           FileChecksumMatcher(output + '*-of-*',
-                                              self.DEFAULT_CHECKSUM)]
+                                              self.DEFAULT_CHECKSUM,
+                                              sleep_secs)]
     extra_opts = {'output': output,
                   'on_success_matcher': all_of(*pipeline_verifiers)}
 

http://git-wip-us.apache.org/repos/asf/beam/blob/fa6f5f0f/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index 0d6814e..3cac658 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -23,6 +23,7 @@ of test pipeline job. Customized verifier should extend
 """
 
 import logging
+import time
 
 from hamcrest.core.base_matcher import BaseMatcher
 
@@ -79,7 +80,27 @@ class FileChecksumMatcher(BaseMatcher):
   is a hash string computed from content of file(s).
   """
 
-  def __init__(self, file_path, expected_checksum):
+  def __init__(self, file_path, expected_checksum, sleep_secs=None):
+    """Initialize a FileChecksumMatcher object
+
+    Args:
+      file_path : A string that is the full path of output file. This path
+        can contain globs.
+      expected_checksum : A hash string that is computed from expected
+        result.
+      sleep_secs : Number of seconds to wait before verification start.
+        Extra time are given to make sure output files are ready on FS.
+    """
+    if sleep_secs is not None:
+      if isinstance(sleep_secs, int):
+        self.sleep_secs = sleep_secs
+      else:
+        raise ValueError('Sleep seconds, if received, must be int. '
+                         'But received: %r, %s' % (sleep_secs,
+                                                   type(sleep_secs)))
+    else:
+      self.sleep_secs = None
+
     self.file_path = file_path
     self.file_system = get_filesystem(self.file_path)
     self.expected_checksum = expected_checksum
@@ -94,6 +115,9 @@ class FileChecksumMatcher(BaseMatcher):
     matched_path = [f.path for f in match_result.metadata_list]
     if not matched_path:
       raise IOError('No such file or directory: %s' % self.file_path)
+
+    logging.info('Find %d files in %s: \n%s',
+                 len(matched_path), self.file_path, '\n'.join(matched_path))
     for path in matched_path:
       with self.file_system.open(path, 'r') as f:
         for line in f:
@@ -101,6 +125,11 @@ class FileChecksumMatcher(BaseMatcher):
     return read_lines
 
   def _matches(self, _):
+    if self.sleep_secs:
+      # Wait to have output file ready on FS
+      logging.info('Wait %d seconds...', self.sleep_secs)
+      time.sleep(self.sleep_secs)
+
     # Read from given file(s) path
     read_lines = self._read_with_retry()
 

http://git-wip-us.apache.org/repos/asf/beam/blob/fa6f5f0f/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
index af8f441..909917d 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
@@ -121,6 +121,27 @@ class PipelineVerifiersTest(unittest.TestCase):
     self.assertTrue(mock_match.called)
     self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count)
 
+  def test_file_checksum_matchcer_invalid_sleep_time(self):
+    with self.assertRaises(ValueError) as cm:
+      verifiers.FileChecksumMatcher('file_path',
+                                    'expected_checksum',
+                                    'invalid_sleep_time')
+    self.assertEqual(cm.exception.message,
+                     'Sleep seconds, if received, must be int. '
+                     'But received: \'invalid_sleep_time\', '
+                     '<type \'str\'>')
+
+  @patch('time.sleep', return_value=None)
+  def test_file_checksum_matcher_sleep_before_verify(self, mocked_sleep):
+    temp_dir = tempfile.mkdtemp()
+    case = self.test_cases[0]
+    self.create_temp_file(case['content'], temp_dir)
+    matcher = verifiers.FileChecksumMatcher(temp_dir + '/*',
+                                            case['expected_checksum'],
+                                            10)
+    hc_assert_that(self._mock_result, matcher)
+    self.assertTrue(mocked_sleep.called)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

http://git-wip-us.apache.org/repos/asf/beam/blob/fa6f5f0f/sdks/python/run_postcommit.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh
index 4d17942..50338e2 100755
--- a/sdks/python/run_postcommit.sh
+++ b/sdks/python/run_postcommit.sh
@@ -101,4 +101,5 @@ python setup.py nosetests \
     --output=$GCS_LOCATION/py-wordcount-cloud/output \
     --sdk_location=$SDK_LOCATION \
     --job_name=$JOBNAME_E2E_WC \
-    --num_workers=1"
+    --num_workers=1 \
+    --sleep_secs=20"


Mime
View raw message